import fs from 'node:fs';
import path from 'node:path';
import { Agent } from '@mastra/core/agent';
import { RequestContext } from '@mastra/core/di';
import { Mastra } from '@mastra/core/mastra';
import { TABLE_WORKFLOW_SNAPSHOT, MockStore } from '@mastra/core/storage';
import { createTool } from '@mastra/core/tools';
import type { StreamEvent } from '@mastra/core/workflows';
import { mapVariable } from '@mastra/core/workflows';
import { cloneStep, cloneWorkflow, createStep, createWorkflow } from '@mastra/core/workflows/evented';
import { simulateReadableStream } from 'ai';
import { MockLanguageModelV1 } from 'ai/test';
import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest';
import { z } from 'zod';
import { GoogleCloudPubSub } from '.';

const testStorage = new MockStore();

let testRunIdCounter = 0;

describe.sequential(
  'Workflow',
  () => {
    beforeEach(async () => {
      vi.resetAllMocks();
      testStorage.clearTable({ tableName: TABLE_WORKFLOW_SNAPSHOT });
    });

    describe.sequential('Streaming', () => {
      it('should generate a stream', async () => {
        const step1Action = vi.fn<any>().mockResolvedValue({ result: 'success1' });
        const step2Action = vi.fn<any>().mockResolvedValue({ result: 'success2' });

        const step1 = createStep({
          id: 'step1',
          execute: step1Action,
          inputSchema: z.object({}),
          outputSchema: z.object({ value: z.string() }),
        });
        const step2 = createStep({
          id: 'step2',
          execute: step2Action,
          inputSchema: z.object({ value: z.string() }),
          outputSchema: z.object({}),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
          steps: [step1, step2],
        });
        workflow.then(step1).then(step2).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const runId = `test-run-id-${testRunIdCounter++}`;
        let watchData: StreamEvent[] = [];
        const run = await workflow.createRun({
          runId,
        });

        const { stream, getWorkflowState } = await run.streamAsync({ inputData: {} });

        // Start watching the workflow
        const collectedStreamData: StreamEvent[] = [];
        for await (const data of stream) {
          collectedStreamData.push(JSON.parse(JSON.stringify(data)));
        }
        watchData = collectedStreamData;

        const executionResult = await getWorkflowState();

        expect(watchData.length).toBe(8);
        expect(watchData).toMatchObject([
          {
            payload: {
              runId,
            },
            type: 'start',
          },
          {
            payload: {
              id: 'step1',
              payload: {},
              startedAt: expect.any(Number),
            },
            type: 'step-start',
          },
          {
            payload: {
              id: 'step1',
              output: {
                result: 'success1',
              },
              endedAt: expect.any(Number),
              status: 'success',
            },
            type: 'step-result',
          },
          {
            payload: {
              id: 'step1',
              metadata: {},
            },
            type: 'step-finish',
          },
          {
            payload: {
              id: 'step2',
              payload: {
                result: 'success1',
              },
              startedAt: expect.any(Number),
            },
            type: 'step-start',
          },
          {
            payload: {
              id: 'step2',
              output: {
                result: 'success2',
              },
              endedAt: expect.any(Number),
              status: 'success',
            },
            type: 'step-result',
          },
          {
            payload: {
              id: 'step2',
              metadata: {},
            },
            type: 'step-finish',
          },
          {
            payload: {
              runId,
            },
            type: 'finish',
          },
        ]);
        // Verify execution completed successfully
        expect(executionResult.steps.step1).toEqual({
          status: 'success',
          output: { result: 'success1' },
          payload: {},
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        expect(executionResult.steps.step2).toEqual({
          status: 'success',
          output: { result: 'success2' },
          payload: {
            result: 'success1',
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });

      it('should handle basic suspend and resume flow', async () => {
        const getUserInputAction = vi.fn().mockResolvedValue({ userInput: 'test input' });
        const promptAgentAction = vi
          .fn()
          .mockImplementationOnce(async ({ suspend }) => {
            await suspend();
            return undefined;
          })
          .mockImplementationOnce(() => ({ modelOutput: 'test output' }));
        const evaluateToneAction = vi.fn().mockResolvedValue({
          toneScore: { score: 0.8 },
          completenessScore: { score: 0.7 },
        });
        const improveResponseAction = vi.fn().mockResolvedValue({ improvedOutput: 'improved output' });
        const evaluateImprovedAction = vi.fn().mockResolvedValue({
          toneScore: { score: 0.9 },
          completenessScore: { score: 0.8 },
        });

        const getUserInput = createStep({
          id: 'getUserInput',
          execute: getUserInputAction,
          inputSchema: z.object({ input: z.string() }),
          outputSchema: z.object({ userInput: z.string() }),
        });
        const promptAgent = createStep({
          id: 'promptAgent',
          execute: promptAgentAction,
          inputSchema: z.object({ userInput: z.string() }),
          outputSchema: z.object({ modelOutput: z.string() }),
        });
        const evaluateTone = createStep({
          id: 'evaluateToneConsistency',
          execute: evaluateToneAction,
          inputSchema: z.object({ modelOutput: z.string() }),
          outputSchema: z.object({
            toneScore: z.any(),
            completenessScore: z.any(),
          }),
        });
        const improveResponse = createStep({
          id: 'improveResponse',
          execute: improveResponseAction,
          inputSchema: z.object({ toneScore: z.any(), completenessScore: z.any() }),
          outputSchema: z.object({ improvedOutput: z.string() }),
        });
        const evaluateImproved = createStep({
          id: 'evaluateImprovedResponse',
          execute: evaluateImprovedAction,
          inputSchema: z.object({ improvedOutput: z.string() }),
          outputSchema: z.object({
            toneScore: z.any(),
            completenessScore: z.any(),
          }),
        });

        const promptEvalWorkflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({ input: z.string() }),
          outputSchema: z.object({}),
          steps: [getUserInput, promptAgent, evaluateTone, improveResponse, evaluateImproved],
        });

        promptEvalWorkflow
          .then(getUserInput)
          .then(promptAgent)
          .then(evaluateTone)
          .then(improveResponse)
          .then(evaluateImproved)
          .commit();

        const mastra = new Mastra({
          storage: testStorage,
          workflows: { 'test-workflow': promptEvalWorkflow },
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await promptEvalWorkflow.createRun();

        const { stream, getWorkflowState } = await run.streamAsync({ inputData: { input: 'test' } });

        for await (const data of stream) {
          if (data.type === 'step-suspended') {
            expect(promptAgentAction).toHaveBeenCalledTimes(1);

            // make it async to show that execution is not blocked
            setTimeout(() => {
              const resumeData = { stepId: 'promptAgent', context: { userInput: 'test input for resumption' } };
              run.resume({ resumeData: resumeData as any, step: promptAgent });
            }, 200);
            expect(evaluateToneAction).not.toHaveBeenCalledTimes(1);
          }
        }

        expect(evaluateToneAction).toHaveBeenCalledTimes(1);

        const resumeResult = await getWorkflowState();

        expect(resumeResult.steps).toEqual({
          input: { input: 'test' },
          getUserInput: {
            status: 'success',
            output: { userInput: 'test input' },
            payload: { input: 'test' },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
          promptAgent: {
            status: 'success',
            output: { modelOutput: 'test output' },
            payload: { userInput: 'test input' },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
            suspendPayload: {},
            resumePayload: { stepId: 'promptAgent', context: { userInput: 'test input for resumption' } },
            resumedAt: expect.any(Number),
            suspendedAt: expect.any(Number),
          },
          evaluateToneConsistency: {
            status: 'success',
            output: { toneScore: { score: 0.8 }, completenessScore: { score: 0.7 } },
            payload: { modelOutput: 'test output' },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
          improveResponse: {
            status: 'success',
            output: { improvedOutput: 'improved output' },
            payload: { toneScore: { score: 0.8 }, completenessScore: { score: 0.7 } },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
          evaluateImprovedResponse: {
            status: 'success',
            output: { toneScore: { score: 0.9 }, completenessScore: { score: 0.8 } },
            payload: { improvedOutput: 'improved output' },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
        });
        await mastra.stopEventEngine();
      });

      it('should be able to use an agent as a step', async () => {
        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({
            prompt1: z.string(),
            prompt2: z.string(),
          }),
          outputSchema: z.object({}),
        });

        const agent = new Agent({
          id: 'test-agent-1',
          name: 'test-agent-1',
          instructions: 'test agent instructions"',
          model: new MockLanguageModelV1({
            doStream: async () => ({
              stream: simulateReadableStream({
                chunks: [
                  { type: 'text-delta', textDelta: 'Paris' },
                  {
                    type: 'finish',
                    finishReason: 'stop',
                    logprobs: undefined,
                    usage: { completionTokens: 10, promptTokens: 3 },
                  },
                ],
              }),
              rawCall: { rawPrompt: null, rawSettings: {} },
            }),
          }),
        });

        const agent2 = new Agent({
          id: 'test-agent-2',
          name: 'test-agent-2',
          instructions: 'test agent instructions',
          model: new MockLanguageModelV1({
            doStream: async () => ({
              stream: simulateReadableStream({
                chunks: [
                  { type: 'text-delta', textDelta: 'London' },
                  {
                    type: 'finish',
                    finishReason: 'stop',
                    logprobs: undefined,
                    usage: { completionTokens: 10, promptTokens: 3 },
                  },
                ],
              }),
              rawCall: { rawPrompt: null, rawSettings: {} },
            }),
          }),
        });

        const startStep = createStep({
          id: 'start',
          inputSchema: z.object({
            prompt1: z.string(),
            prompt2: z.string(),
          }),
          outputSchema: z.object({ prompt1: z.string(), prompt2: z.string() }),
          execute: async ({ inputData }) => {
            return {
              prompt1: inputData.prompt1,
              prompt2: inputData.prompt2,
            };
          },
        });

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          agents: { 'test-agent-1': agent, 'test-agent-2': agent2 },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const agentStep1 = createStep(agent);
        const agentStep2 = createStep(agent2);

        workflow
          .then(startStep)
          .map({
            prompt: {
              step: startStep,
              path: 'prompt1',
            },
          })
          .then(agentStep1)
          .map({
            prompt: {
              step: startStep,
              path: 'prompt2',
            },
          })
          .then(agentStep2)
          .commit();

        const runId = `test-run-id-${testRunIdCounter++}`;
        const run = await workflow.createRun({
          runId,
        });
        const { stream } = await run.streamAsync({
          inputData: {
            prompt1: 'Capital of France, just the name',
            prompt2: 'Capital of UK, just the name',
          },
        });

        const values: StreamEvent[] = [];
        for await (const value of stream.values()) {
          values.push(value);
        }

        expect(values).toMatchObject([
          {
            payload: {
              runId,
            },
            type: 'start',
          },
          {
            payload: {
              id: 'start',
              payload: {
                prompt1: 'Capital of France, just the name',
                prompt2: 'Capital of UK, just the name',
              },
              startedAt: expect.any(Number),
            },
            type: 'step-start',
          },
          {
            payload: {
              id: 'start',
              output: {
                prompt1: 'Capital of France, just the name',
                prompt2: 'Capital of UK, just the name',
              },
              endedAt: expect.any(Number),
              status: 'success',
            },
            type: 'step-result',
          },
          {
            payload: {
              id: 'start',
              metadata: {},
            },
            type: 'step-finish',
          },
          {
            payload: {
              id: expect.any(String),
              payload: {
                prompt1: 'Capital of France, just the name',
                prompt2: 'Capital of UK, just the name',
              },
              startedAt: expect.any(Number),
            },
            type: 'step-start',
          },
          {
            payload: {
              id: expect.any(String),
              endedAt: expect.any(Number),
              output: {
                prompt: 'Capital of France, just the name',
              },
              status: 'success',
            },
            type: 'step-result',
          },
          {
            payload: {
              id: expect.any(String),
              metadata: {},
            },
            type: 'step-finish',
          },
          {
            payload: {
              id: 'test-agent-1',
              payload: {
                prompt: 'Capital of France, just the name',
              },
              startedAt: expect.any(Number),
            },
            type: 'step-start',
          },
          {
            args: {
              prompt: 'Capital of France, just the name',
            },
            name: 'test-agent-1',
            type: 'tool-call-streaming-start',
          },
          {
            args: {
              prompt: 'Capital of France, just the name',
            },
            argsTextDelta: 'Paris',
            name: 'test-agent-1',
            type: 'tool-call-delta',
          },
          {
            payload: {
              id: 'test-agent-1',
              output: {
                text: 'Paris',
              },
              endedAt: expect.any(Number),
              status: 'success',
            },
            type: 'step-result',
          },
          {
            payload: {
              id: 'test-agent-1',
              metadata: {},
            },
            type: 'step-finish',
          },
          {
            payload: {
              id: expect.any(String),
              payload: {
                text: 'Paris',
              },
              startedAt: expect.any(Number),
            },
            type: 'step-start',
          },
          {
            payload: {
              id: expect.any(String),
              endedAt: expect.any(Number),
              output: {
                prompt: 'Capital of UK, just the name',
              },
              status: 'success',
            },
            type: 'step-result',
          },
          {
            payload: {
              id: expect.any(String),
              metadata: {},
            },
            type: 'step-finish',
          },
          {
            payload: {
              id: 'test-agent-2',
              payload: {
                prompt: 'Capital of UK, just the name',
              },
              startedAt: expect.any(Number),
            },
            type: 'step-start',
          },
          {
            args: {
              prompt: 'Capital of UK, just the name',
            },
            name: 'test-agent-2',
            type: 'tool-call-streaming-start',
          },
          {
            args: {
              prompt: 'Capital of UK, just the name',
            },
            argsTextDelta: 'London',
            name: 'test-agent-2',
            type: 'tool-call-delta',
          },
          {
            payload: {
              id: 'test-agent-2',
              endedAt: expect.any(Number),
              output: {
                text: 'London',
              },
              status: 'success',
            },
            type: 'step-result',
          },
          {
            payload: {
              id: 'test-agent-2',
              metadata: {},
            },
            type: 'step-finish',
          },
          {
            payload: {
              runId,
            },
            type: 'finish',
          },
        ]);
        await mastra.stopEventEngine();
      });

      it('should handle sleep waiting flow', async () => {
        const step1Action = vi.fn<any>().mockResolvedValue({ result: 'success1' });
        const step2Action = vi.fn<any>().mockResolvedValue({ result: 'success2' });

        const step1 = createStep({
          id: 'step1',
          execute: step1Action,
          inputSchema: z.object({}),
          outputSchema: z.object({ value: z.string() }),
        });
        const step2 = createStep({
          id: 'step2',
          execute: step2Action,
          inputSchema: z.object({ value: z.string() }),
          outputSchema: z.object({}),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
          steps: [step1, step2],
        });
        workflow.then(step1).sleep(1000).then(step2).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();
        await new Promise(resolve => setTimeout(resolve, 1000));

        const runId = `test-run-id-${testRunIdCounter++}`;
        let watchData: StreamEvent[] = [];
        const run = await workflow.createRun({
          runId,
        });

        const { stream, getWorkflowState } = await run.streamAsync({ inputData: {} });

        // Start watching the workflow
        const collectedStreamData: StreamEvent[] = [];
        for await (const data of stream) {
          collectedStreamData.push(JSON.parse(JSON.stringify(data)));
        }
        watchData = collectedStreamData;

        const executionResult = await getWorkflowState();

        expect(watchData.length).toBe(11);
        expect(watchData).toMatchObject([
          {
            payload: {
              runId,
            },
            type: 'start',
          },
          {
            payload: {
              id: 'step1',
              startedAt: expect.any(Number),
              status: 'running',
              payload: {},
            },
            type: 'step-start',
          },
          {
            payload: {
              id: 'step1',
              output: {
                result: 'success1',
              },
              endedAt: expect.any(Number),
              status: 'success',
            },
            type: 'step-result',
          },
          {
            payload: {
              id: 'step1',
              metadata: {},
            },
            type: 'step-finish',
          },
          {
            payload: {
              id: expect.any(String),
              startedAt: expect.any(Number),
              status: 'waiting',
              payload: {
                result: 'success1',
              },
            },
            type: 'step-waiting',
          },
          {
            payload: {
              id: expect.any(String),
              endedAt: expect.any(Number),
              startedAt: expect.any(Number),
              status: 'success',
              output: {
                result: 'success1',
              },
            },
            type: 'step-result',
          },
          {
            type: 'step-finish',
            payload: {
              id: expect.any(String),
              metadata: {},
            },
          },
          {
            payload: {
              id: 'step2',
              payload: {
                result: 'success1',
              },
              startedAt: expect.any(Number),
              status: 'running',
            },
            type: 'step-start',
          },
          {
            payload: {
              id: 'step2',
              output: {
                result: 'success2',
              },
              endedAt: expect.any(Number),
              status: 'success',
            },
            type: 'step-result',
          },
          {
            payload: {
              id: 'step2',
              metadata: {},
            },
            type: 'step-finish',
          },
          {
            payload: {
              runId,
            },
            type: 'finish',
          },
        ]);
        // Verify execution completed successfully
        expect(executionResult.steps.step1).toEqual({
          status: 'success',
          output: { result: 'success1' },
          payload: {},
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        expect(executionResult.steps.step2).toEqual({
          status: 'success',
          output: { result: 'success2' },
          payload: {
            result: 'success1',
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('Basic Workflow Execution', () => {
      it('should be able to bail workflow execution', async () => {
        const step1 = createStep({
          id: 'step1',
          execute: async ({ bail, inputData }) => {
            if (inputData.value === 'bail') {
              return bail({ result: 'bailed' });
            }

            return { result: 'step1: ' + inputData.value };
          },
          inputSchema: z.object({ value: z.string() }),
          outputSchema: z.object({ result: z.string() }),
        });
        const step2 = createStep({
          id: 'step2',
          execute: async ({ inputData }) => {
            return { result: 'step2: ' + inputData.result };
          },
          inputSchema: z.object({ result: z.string() }),
          outputSchema: z.object({ result: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({
            result: z.string(),
          }),
          steps: [step1, step2],
        });

        workflow.then(step1).then(step2).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const result = await run.start({ inputData: { value: 'bail' } });

        expect(result.steps['step1']).toEqual({
          status: 'success',
          output: { result: 'bailed' },
          payload: { value: 'bail' },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });

        expect(result.steps['step2']).toBeUndefined();

        const run2 = await workflow.createRun();
        const result2 = await run2.start({ inputData: { value: 'no-bail' } });

        expect(result2.steps['step1']).toEqual({
          status: 'success',
          output: { result: 'step1: no-bail' },
          payload: { value: 'no-bail' },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });

        expect(result2.steps['step2']).toEqual({
          status: 'success',
          output: { result: 'step2: step1: no-bail' },
          payload: { result: 'step1: no-bail' },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });

      it('should throw error when execution flow not defined', () => {
        const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
        const step1 = createStep({
          id: 'step1',
          execute,
          inputSchema: z.object({}),
          outputSchema: z.object({ result: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({
            result: z.string(),
          }),
          steps: [step1],
        });

        expect(() => workflow.createRun()).toThrowError(
          'Execution flow of workflow is not defined. Add steps to the workflow via .then(), .branch(), etc.',
        );
      });

      it('should throw error when execution graph is not committed', () => {
        const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
        const step1 = createStep({
          id: 'step1',
          execute,
          inputSchema: z.object({}),
          outputSchema: z.object({ result: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({
            result: z.string(),
          }),
          steps: [step1],
        });

        workflow.then(step1);

        expect(() => workflow.createRun()).toThrowError(
          'Uncommitted step flow changes detected. Call .commit() to register the steps.',
        );
      });

      it('should execute a single step workflow successfully', async () => {
        const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
        const step1 = createStep({
          id: 'step1',
          execute,
          inputSchema: z.object({}),
          outputSchema: z.object({ result: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({
            result: z.string(),
          }),
          steps: [step1],
        });

        workflow.then(step1).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const result = await run.start({ inputData: {} });

        expect(execute).toHaveBeenCalled();
        expect(result.steps['step1']).toEqual({
          status: 'success',
          output: { result: 'success' },
          payload: {},
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });

      it('should have access to typed workflow results', async () => {
        const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
        const step1 = createStep({
          id: 'step1',
          execute,
          inputSchema: z.object({}),
          suspendSchema: z.object({ hello: z.string() }).strict(),
          resumeSchema: z.object({ resumeInfo: z.object({ hello: z.string() }).strict() }),
          outputSchema: z.object({ result: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({
            result: z.string(),
          }),
          steps: [step1],
        });

        workflow.then(step1).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const result = await run.start({ inputData: {} });

        expect(execute).toHaveBeenCalled();
        expect(result.steps['step1']).toEqual({
          status: 'success',
          output: { result: 'success' },
          payload: {},
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });

      it('should execute multiple steps in parallel', async () => {
        const step1Action = vi.fn().mockImplementation(async () => {
          return { value: 'step1' };
        });
        const step2Action = vi.fn().mockImplementation(async () => {
          return { value: 'step2' };
        });

        const step1 = createStep({
          id: 'step1',
          execute: step1Action,
          inputSchema: z.object({}),
          outputSchema: z.object({ value: z.string() }),
        });
        const step2 = createStep({
          id: 'step2',
          execute: step2Action,
          inputSchema: z.object({}),
          outputSchema: z.object({ value: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({ value: z.string() }),
          steps: [step1, step2],
        });

        workflow.parallel([step1, step2]).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const result = await run.start({ inputData: {} });

        expect(step1Action).toHaveBeenCalled();
        expect(step2Action).toHaveBeenCalled();
        expect(result.steps).toEqual({
          input: {},
          step1: {
            status: 'success',
            output: { value: 'step1' },
            payload: {},

            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
          step2: {
            status: 'success',
            output: { value: 'step2' },
            payload: {},

            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
        });
        await mastra.stopEventEngine();
      });

      it('should have runId in the step execute function - bug #4260', async () => {
        const step1Action = vi.fn().mockImplementation(({ runId }) => {
          return { value: runId };
        });

        const step1 = createStep({
          id: 'step1',
          execute: step1Action,
          inputSchema: z.object({}),
          outputSchema: z.object({ value: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({ value: z.string() }),
          steps: [step1],
        });

        workflow.then(step1).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const result = await run.start({ inputData: {} });

        expect(result.steps).toEqual({
          input: {},
          step1: {
            status: 'success',
            output: { value: run.runId },
            payload: {},

            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
        });
        await mastra.stopEventEngine();
      });

      describe.sequential('Variable Resolution', () => {
        it('should resolve trigger data', async () => {
          const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });

          const step1 = createStep({
            id: 'step1',
            execute,
            inputSchema: z.object({ inputData: z.string() }),
            outputSchema: z.object({ result: z.string() }),
          });
          const step2 = createStep({
            id: 'step2',
            execute,
            inputSchema: z.object({ result: z.string() }),
            outputSchema: z.object({ result: z.string() }),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: z.object({ inputData: z.string() }),
            outputSchema: z.object({}),
          });

          workflow.then(step1).then(step2).commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          const result = await run.start({ inputData: { inputData: 'test-input' } });

          expect(result.steps.step1).toEqual({
            status: 'success',
            output: { result: 'success' },
            payload: {
              inputData: 'test-input',
            },

            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });
          expect(result.steps.step2).toEqual({
            status: 'success',
            output: { result: 'success' },
            payload: { result: 'success' },

            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });
          await mastra.stopEventEngine();
        });

        it('should provide access to step results and trigger data via getStepResult helper', async () => {
          const step1Action = vi.fn().mockImplementation(async ({ inputData }) => {
            // Test accessing trigger data with correct type
            expect(inputData).toEqual({ inputValue: 'test-input' });
            return { value: 'step1-result' };
          });

          const step2Action = vi.fn().mockImplementation(async ({ getStepResult }) => {
            // Test accessing previous step result with type
            const step1Result = getStepResult(step1);
            expect(step1Result).toEqual({ value: 'step1-result' });

            const failedStep = getStepResult(nonExecutedStep);
            expect(failedStep).toBe(null);

            return { value: 'step2-result' };
          });

          const step1 = createStep({
            id: 'step1',
            execute: step1Action,
            inputSchema: z.object({ inputValue: z.string() }),
            outputSchema: z.object({ value: z.string() }),
          });
          const step2 = createStep({
            id: 'step2',
            execute: step2Action,
            inputSchema: z.object({ value: z.string() }),
            outputSchema: z.object({ value: z.string() }),
          });

          const nonExecutedStep = createStep({
            id: 'non-executed-step',
            execute: vi.fn(),
            inputSchema: z.object({}),
            outputSchema: z.object({}),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: z.object({ inputValue: z.string() }),
            outputSchema: z.object({ value: z.string() }),
          });

          workflow.then(step1).then(step2).commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          const result = await run.start({ inputData: { inputValue: 'test-input' } });

          expect(step1Action).toHaveBeenCalled();
          expect(step2Action).toHaveBeenCalled();
          expect(result.steps).toEqual({
            input: { inputValue: 'test-input' },
            step1: {
              status: 'success',
              output: { value: 'step1-result' },
              payload: {
                inputValue: 'test-input',
              },

              startedAt: expect.any(Number),
              endedAt: expect.any(Number),
            },
            step2: {
              status: 'success',
              output: { value: 'step2-result' },
              payload: {
                value: 'step1-result',
              },

              startedAt: expect.any(Number),
              endedAt: expect.any(Number),
            },
          });
          await mastra.stopEventEngine();
        });

        it('should resolve trigger data from context', async () => {
          const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
          const triggerSchema = z.object({
            inputData: z.string(),
          });

          const step1 = createStep({
            id: 'step1',
            execute,
            inputSchema: triggerSchema,
            outputSchema: z.object({ result: z.string() }),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: triggerSchema,
            outputSchema: z.object({ result: z.string() }),
          });

          workflow.then(step1).commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          await run.start({ inputData: { inputData: 'test-input' } });

          expect(execute).toHaveBeenCalledWith(
            expect.objectContaining({
              inputData: { inputData: 'test-input' },
            }),
          );
          await mastra.stopEventEngine();
        });

        it('should resolve trigger data from getInitData', async () => {
          const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
          const triggerSchema = z.object({
            cool: z.string(),
          });

          const step1 = createStep({
            id: 'step1',
            execute,
            inputSchema: triggerSchema,
            outputSchema: z.object({ result: z.string() }),
          });

          const step2 = createStep({
            id: 'step2',
            execute: async ({ getInitData }) => {
              const initData = getInitData<typeof triggerSchema>();
              return { result: initData };
            },
            inputSchema: z.object({ result: z.string() }),
            outputSchema: z.object({ result: z.object({ cool: z.string() }) }),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: triggerSchema,
            outputSchema: z.object({ result: z.string() }),
            steps: [step1, step2],
          });

          workflow.then(step1).then(step2).commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          const result = await run.start({ inputData: { cool: 'test-input' } });

          expect(execute).toHaveBeenCalledWith(
            expect.objectContaining({
              inputData: { cool: 'test-input' },
            }),
          );

          expect(result.steps.step2).toEqual({
            status: 'success',
            output: { result: { cool: 'test-input' } },
            payload: {
              result: 'success',
            },

            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });
          await mastra.stopEventEngine();
        });

        it('should resolve trigger data from getInitData with workflow schema', async () => {
          const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
          const triggerSchema = z.object({
            cool: z.string(),
          });

          const step1 = createStep({
            id: 'step1',
            execute,
            inputSchema: triggerSchema,
            outputSchema: z.object({ result: z.string() }),
          });

          const step2 = createStep({
            id: 'step2',
            execute: async ({ getInitData }) => {
              const initData = getInitData<typeof workflow>();
              return { result: initData };
            },
            inputSchema: z.object({ result: z.string() }),
            outputSchema: z.object({ result: z.object({ cool: z.string() }) }),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: triggerSchema,
            outputSchema: z.object({ result: z.string() }),
          });

          workflow.then(step1).then(step2).commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          const result = await run.start({ inputData: { cool: 'test-input' } });

          expect(execute).toHaveBeenCalledWith(
            expect.objectContaining({
              inputData: { cool: 'test-input' },
            }),
          );

          expect(result.steps.step2).toEqual({
            status: 'success',
            output: { result: { cool: 'test-input' } },
            payload: { result: 'success' },

            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });
          await mastra.stopEventEngine();
        });

        it('should resolve trigger data and DI requestContext values via .map()', async () => {
          const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
          const triggerSchema = z.object({
            cool: z.string(),
          });

          const step1 = createStep({
            id: 'step1',
            execute,
            inputSchema: triggerSchema,
            outputSchema: z.object({ result: z.string() }),
          });

          const step2 = createStep({
            id: 'step2',
            execute: async ({ inputData }) => {
              return { result: inputData.test, second: inputData.test2 };
            },
            inputSchema: z.object({ test: z.string(), test2: z.number() }),
            outputSchema: z.object({ result: z.string(), second: z.number() }),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: triggerSchema,
            outputSchema: z.object({ result: z.string(), second: z.number() }),
          });

          workflow
            .then(step1)
            .map({
              test: mapVariable({
                initData: workflow,
                path: 'cool',
              }),
              test2: {
                requestContextPath: 'life',
                schema: z.number(),
              },
            })
            .then(step2)
            .commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const requestContext = new RequestContext<{ life: number }>();
          requestContext.set('life', 42);

          const run = await workflow.createRun();
          const result = await run.start({ inputData: { cool: 'test-input' }, requestContext });

          expect(execute).toHaveBeenCalledWith(
            expect.objectContaining({
              inputData: { cool: 'test-input' },
            }),
          );

          expect(result.steps.step2).toEqual({
            status: 'success',
            output: { result: 'test-input', second: 42 },
            payload: { test: 'test-input', test2: 42 },

            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });
          await mastra.stopEventEngine();
        });

        it('should resolve dynamic mappings via .map()', async () => {
          const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
          const triggerSchema = z.object({
            cool: z.string(),
          });

          const step1 = createStep({
            id: 'step1',
            execute,
            inputSchema: triggerSchema,
            outputSchema: z.object({ result: z.string() }),
          });

          const step2 = createStep({
            id: 'step2',
            execute: async ({ inputData }) => {
              return { result: inputData.test, second: inputData.test2 };
            },
            inputSchema: z.object({ test: z.string(), test2: z.string() }),
            outputSchema: z.object({ result: z.string(), second: z.string() }),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: triggerSchema,
            outputSchema: z.object({ result: z.string(), second: z.string() }),
          });

          workflow
            .then(step1)
            .map({
              test: mapVariable({
                initData: workflow,
                path: 'cool',
              }),
              test2: {
                schema: z.string(),
                fn: async ({ inputData }) => {
                  return 'Hello ' + inputData.result;
                },
              },
            })
            .then(step2)
            .map({
              result: mapVariable({
                step: step2,
                path: 'result',
              }),
              second: {
                schema: z.string(),
                fn: async ({ getStepResult }) => {
                  return getStepResult(step1).result;
                },
              },
            })
            .commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          const result = await run.start({ inputData: { cool: 'test-input' } });

          if (result.status !== 'success') {
            expect.fail('Workflow should have succeeded');
          }

          expect(execute).toHaveBeenCalledWith(
            expect.objectContaining({
              inputData: { cool: 'test-input' },
            }),
          );

          expect(result.steps.step2).toEqual({
            status: 'success',
            output: { result: 'test-input', second: 'Hello success' },
            payload: { test: 'test-input', test2: 'Hello success' },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });

          expect(result.result).toEqual({
            result: 'test-input',
            second: 'success',
          });
          await mastra.stopEventEngine();
        });

        it('should resolve variables from previous steps', async () => {
          const step1Action = vi.fn<any>().mockResolvedValue({
            nested: { value: 'step1-data' },
          });
          const step2Action = vi.fn<any>().mockResolvedValue({ result: 'success' });

          const step1 = createStep({
            id: 'step1',
            execute: step1Action,
            inputSchema: z.object({}),
            outputSchema: z.object({ nested: z.object({ value: z.string() }) }),
          });
          const step2 = createStep({
            id: 'step2',
            execute: step2Action,
            inputSchema: z.object({ previousValue: z.string() }),
            outputSchema: z.object({ result: z.string() }),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: z.object({}),
            outputSchema: z.object({ result: z.string() }),
          });

          workflow
            .then(step1)
            .map({
              previousValue: mapVariable({
                step: step1,
                path: 'nested.value',
              }),
            })
            .then(step2)
            .commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          await run.start({ inputData: {} });

          expect(step2Action).toHaveBeenCalledWith(
            expect.objectContaining({
              inputData: {
                previousValue: 'step1-data',
              },
            }),
          );
          await mastra.stopEventEngine();
        });

        it('should resolve inputs from previous steps that are not objects', async () => {
          const step1 = createStep({
            id: 'step1',
            execute: async () => {
              return 'step1-data';
            },
            inputSchema: z.object({}),
            outputSchema: z.string(),
          });
          const step2 = createStep({
            id: 'step2',
            execute: async ({ inputData }) => {
              return { result: 'success', input: inputData };
            },
            inputSchema: z.string(),
            outputSchema: z.object({ result: z.string(), input: z.string() }),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: z.object({}),
            outputSchema: z.object({ result: z.string() }),
          });

          workflow.then(step1).then(step2).commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          const result = await run.start({ inputData: {} });

          expect(result.steps).toEqual({
            input: {},
            step1: {
              status: 'success',
              output: 'step1-data',
              payload: {},

              startedAt: expect.any(Number),
              endedAt: expect.any(Number),
            },
            step2: {
              status: 'success',
              output: { result: 'success', input: 'step1-data' },
              payload: 'step1-data',

              startedAt: expect.any(Number),
              endedAt: expect.any(Number),
            },
          });
          await mastra.stopEventEngine();
        });

        it('should resolve inputs from previous steps that are arrays', async () => {
          const step1 = createStep({
            id: 'step1',
            execute: async () => {
              return [{ str: 'step1-data' }];
            },
            inputSchema: z.object({}),
            outputSchema: z.array(z.object({ str: z.string() })),
          });
          const step2 = createStep({
            id: 'step2',
            execute: async ({ inputData }) => {
              return { result: 'success', input: inputData };
            },
            inputSchema: z.array(z.object({ str: z.string() })),
            outputSchema: z.object({ result: z.string(), input: z.array(z.object({ str: z.string() })) }),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: z.object({}),
            outputSchema: z.object({ result: z.string() }),
          });

          workflow.then(step1).then(step2).commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          const result = await run.start({ inputData: {} });

          expect(result.steps).toEqual({
            input: {},
            step1: {
              status: 'success',
              output: [{ str: 'step1-data' }],
              payload: {},

              startedAt: expect.any(Number),
              endedAt: expect.any(Number),
            },
            step2: {
              status: 'success',
              output: { result: 'success', input: [{ str: 'step1-data' }] },
              payload: [{ str: 'step1-data' }],

              startedAt: expect.any(Number),
              endedAt: expect.any(Number),
            },
          });
          await mastra.stopEventEngine();
        });

        it('should resolve inputs from previous steps that are arrays via .map()', async () => {
          const step1 = createStep({
            id: 'step1',
            execute: async () => {
              return [{ str: 'step1-data' }];
            },
            inputSchema: z.object({}),
            outputSchema: z.array(z.object({ str: z.string() })),
          });
          const step2 = createStep({
            id: 'step2',
            execute: async ({ inputData }) => {
              return { result: 'success', input: inputData.ary };
            },
            inputSchema: z.object({ ary: z.array(z.object({ str: z.string() })) }),
            outputSchema: z.object({ result: z.string(), input: z.array(z.object({ str: z.string() })) }),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: z.object({}),
            outputSchema: z.object({ result: z.string() }),
          });

          workflow
            .then(step1)
            .map({
              ary: mapVariable({
                step: step1,
                path: '.',
              }),
            })
            .then(step2)
            .commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          const result = await run.start({ inputData: {} });

          expect(result.steps).toMatchObject({
            input: {},
            step1: {
              status: 'success',
              output: [{ str: 'step1-data' }],
              payload: {},

              startedAt: expect.any(Number),
              endedAt: expect.any(Number),
            },
            step2: {
              status: 'success',
              output: { result: 'success', input: [{ str: 'step1-data' }] },
              payload: { ary: [{ str: 'step1-data' }] },

              startedAt: expect.any(Number),
              endedAt: expect.any(Number),
            },
          });
          await mastra.stopEventEngine();
        });

        it('should resolve constant values via .map()', async () => {
          const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
          const triggerSchema = z.object({
            cool: z.string(),
          });

          const step1 = createStep({
            id: 'step1',
            execute,
            inputSchema: triggerSchema,
            outputSchema: z.object({ result: z.string() }),
          });

          const step2 = createStep({
            id: 'step2',
            execute: async ({ inputData }) => {
              return { result: inputData.candidates.map(c => c.name).join('') || 'none', second: inputData.iteration };
            },
            inputSchema: z.object({ candidates: z.array(z.object({ name: z.string() })), iteration: z.number() }),
            outputSchema: z.object({ result: z.string(), second: z.number() }),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: triggerSchema,
            outputSchema: z.object({ result: z.string(), second: z.number() }),
          });

          workflow
            .then(step1)
            .map({
              candidates: {
                value: [],
                schema: z.array(z.object({ name: z.string() })),
              },
              iteration: {
                value: 0,
                schema: z.number(),
              },
            })
            .then(step2)
            .commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          const result = await run.start({ inputData: { cool: 'test-input' } });

          expect(execute).toHaveBeenCalledWith(
            expect.objectContaining({
              inputData: { cool: 'test-input' },
            }),
          );

          expect(result.steps.step2).toEqual({
            status: 'success',
            output: { result: 'none', second: 0 },
            payload: { candidates: [], iteration: 0 },

            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });
          await mastra.stopEventEngine();
        });

        it('should resolve fully dynamic input via .map()', async () => {
          const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
          const triggerSchema = z.object({
            cool: z.string(),
          });

          const step1 = createStep({
            id: 'step1',
            execute,
            inputSchema: triggerSchema,
            outputSchema: z.object({ result: z.string() }),
          });

          const step2 = createStep({
            id: 'step2',
            execute: async ({ inputData }) => {
              return {
                result: inputData.candidates.map(c => c.name).join(', ') || 'none',
                second: inputData.iteration,
              };
            },
            inputSchema: z.object({ candidates: z.array(z.object({ name: z.string() })), iteration: z.number() }),
            outputSchema: z.object({ result: z.string(), second: z.number() }),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: triggerSchema,
            outputSchema: z.object({ result: z.string(), second: z.number() }),
          });

          workflow
            .then(step1)
            .map(async ({ inputData }) => {
              return {
                candidates: [{ name: inputData.result }, { name: 'hello' }],
                iteration: 0,
              };
            })
            .then(step2)
            .commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          const result = await run.start({ inputData: { cool: 'test-input' } });

          expect(execute).toHaveBeenCalledWith(
            expect.objectContaining({
              inputData: { cool: 'test-input' },
            }),
          );

          expect(result.steps.step2).toEqual({
            status: 'success',
            output: { result: 'success, hello', second: 0 },
            payload: { candidates: [{ name: 'success' }, { name: 'hello' }], iteration: 0 },

            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });
          await mastra.stopEventEngine();
        });
      });

      describe.sequential('Simple Conditions', () => {
        it('should follow conditional chains', async () => {
          const step1Action = vi.fn().mockImplementation(() => {
            return Promise.resolve({ status: 'success' });
          });
          const step2Action = vi.fn().mockImplementation(() => {
            return Promise.resolve({ result: 'step2' });
          });
          const step3Action = vi.fn().mockImplementation(() => {
            return Promise.resolve({ result: 'step3' });
          });

          const step1 = createStep({
            id: 'step1',
            execute: step1Action,
            inputSchema: z.object({ status: z.string() }),
            outputSchema: z.object({ status: z.string() }),
          });
          const step2 = createStep({
            id: 'step2',
            execute: step2Action,
            inputSchema: z.object({ status: z.string() }),
            outputSchema: z.object({ result: z.string() }),
          });
          const step3 = createStep({
            id: 'step3',
            execute: step3Action,
            inputSchema: z.object({ status: z.string() }),
            outputSchema: z.object({ result: z.string() }),
          });
          const step4 = createStep({
            id: 'step4',
            execute: async ({ inputData }) => {
              return { result: inputData.result };
            },
            inputSchema: z.object({ result: z.string() }),
            outputSchema: z.object({ result: z.string() }),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: z.object({ status: z.string() }),
            outputSchema: z.object({ result: z.string() }),
            steps: [step1, step2, step3],
          });

          workflow
            .then(step1)
            .branch([
              [
                async ({ inputData }) => {
                  return inputData.status === 'success';
                },
                step2,
              ],
              [
                async ({ inputData }) => {
                  return inputData.status === 'failed';
                },
                step3,
              ],
            ])
            .map({
              result: {
                step: [step3, step2],
                path: 'result',
              },
            })
            .then(step4)
            .commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          const result = await run.start({ inputData: { status: 'success' } });

          expect(step1Action).toHaveBeenCalled();
          expect(step2Action).toHaveBeenCalled();
          expect(step3Action).not.toHaveBeenCalled();
          expect(result.steps).toMatchObject({
            input: { status: 'success' },
            step1: { status: 'success', output: { status: 'success' } },
            step2: { status: 'success', output: { result: 'step2' } },
            step4: { status: 'success', output: { result: 'step2' } },
          });
          await mastra.stopEventEngine();
        });

        it('should handle failing dependencies', async () => {
          let err: Error | undefined;
          const step1Action = vi.fn<any>().mockImplementation(() => {
            err = new Error('Failed');
            throw err;
          });
          const step2Action = vi.fn<any>();

          const step1 = createStep({
            id: 'step1',
            execute: step1Action,
            inputSchema: z.object({}),
            outputSchema: z.object({}),
          });
          const step2 = createStep({
            id: 'step2',
            execute: step2Action,
            inputSchema: z.object({}),
            outputSchema: z.object({}),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: z.object({}),
            outputSchema: z.object({}),
            steps: [step1, step2],
          });

          workflow.then(step1).then(step2).commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          let result: Awaited<ReturnType<typeof run.start>> | undefined = undefined;
          try {
            result = await run.start({ inputData: {} });
          } catch {
            // do nothing
          }

          expect(step1Action).toHaveBeenCalled();
          expect(step2Action).not.toHaveBeenCalled();
          expect((result?.steps as any)?.input).toEqual({});

          const step1Result = result?.steps?.step1;
          expect(step1Result).toBeDefined();
          expect(step1Result).toMatchObject({
            status: 'failed',
            payload: {},
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });
          expect((step1Result as any)?.error).toMatch(/^Error: Failed/);
          await mastra.stopEventEngine();
        });

        it('should support simple string conditions', async () => {
          const step1Action = vi.fn<any>().mockResolvedValue({ status: 'success' });
          const step2Action = vi.fn<any>().mockResolvedValue({ result: 'step2' });
          const step3Action = vi.fn<any>().mockResolvedValue({ result: 'step3' });
          const step1 = createStep({
            id: 'step1',
            execute: step1Action,
            inputSchema: z.object({}),
            outputSchema: z.object({ status: z.string() }),
          });
          const step2 = createStep({
            id: 'step2',
            execute: step2Action,
            inputSchema: z.object({ status: z.string() }),
            outputSchema: z.object({ result: z.string() }),
          });
          const step3 = createStep({
            id: 'step3',
            execute: step3Action,
            inputSchema: z.object({ result: z.string() }),
            outputSchema: z.object({ result: z.string() }),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: z.object({}),
            outputSchema: z.object({}),
            steps: [step1, step2, step3],
          });
          workflow
            .then(step1)
            .branch([
              [
                async ({ inputData }) => {
                  return inputData.status === 'success';
                },
                step2,
              ],
            ])
            .map({
              result: {
                step: step2,
                path: 'result',
              },
            })
            .branch([
              [
                async ({ inputData }) => {
                  return inputData.result === 'unexpected value';
                },
                step3,
              ],
            ])
            .commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          const result = await run.start({ inputData: { status: 'success' } });

          expect(step1Action).toHaveBeenCalled();
          expect(step2Action).toHaveBeenCalled();
          expect(step3Action).not.toHaveBeenCalled();
          expect(result.steps).toMatchObject({
            input: { status: 'success' },
            step1: {
              status: 'success',
              output: { status: 'success' },
              payload: {},

              startedAt: expect.any(Number),
              endedAt: expect.any(Number),
            },
            step2: {
              status: 'success',
              output: { result: 'step2' },
              payload: { status: 'success' },

              startedAt: expect.any(Number),
              endedAt: expect.any(Number),
            },
          });
          await mastra.stopEventEngine();
        });

        it('should support custom condition functions', async () => {
          const step1Action = vi.fn<any>().mockResolvedValue({ count: 5 });
          const step2Action = vi.fn<any>();

          const step1 = createStep({
            id: 'step1',
            execute: step1Action,
            inputSchema: z.object({}),
            outputSchema: z.object({ count: z.number() }),
          });
          const step2 = createStep({
            id: 'step2',
            execute: step2Action,
            inputSchema: z.object({ count: z.number() }),
            outputSchema: z.object({}),
          });

          const workflow = createWorkflow({
            id: 'test-workflow',
            inputSchema: z.object({}),
            outputSchema: z.object({}),
          });

          workflow
            .then(step1)
            .branch([
              [
                async ({ getStepResult }) => {
                  const step1Result = getStepResult(step1);

                  return step1Result ? step1Result.count > 3 : false;
                },
                step2,
              ],
            ])
            .commit();

          const mastra = new Mastra({
            workflows: { 'test-workflow': workflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = await workflow.createRun();
          const result = await run.start({ inputData: { count: 5 } });

          expect(step2Action).toHaveBeenCalled();
          expect(result.steps.step1).toEqual({
            status: 'success',
            output: { count: 5 },
            payload: { count: 5 },

            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });
          expect(result.steps.step2).toEqual({
            status: 'success',
            output: undefined,
            payload: { count: 5 },

            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });
          await mastra.stopEventEngine();
        });
      });

      it('should execute a a sleep step', async () => {
        const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
        const step1 = createStep({
          id: 'step1',
          execute,
          inputSchema: z.object({}),
          outputSchema: z.object({ result: z.string() }),
        });
        const step2 = createStep({
          id: 'step2',
          execute: async ({ inputData }) => {
            return { result: 'slept successfully: ' + inputData.result };
          },
          inputSchema: z.object({ result: z.string() }),
          outputSchema: z.object({ result: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({
            result: z.string(),
          }),
          steps: [step1],
        });

        workflow.then(step1).sleep(1000).then(step2).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const startTime = Date.now();
        const result = await run.start({ inputData: {} });
        const endTime = Date.now();

        expect(execute).toHaveBeenCalled();
        expect(result.steps['step1']).toEqual({
          status: 'success',
          output: { result: 'success' },
          payload: {},
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });

        expect(result.steps['step2']).toEqual({
          status: 'success',
          output: { result: 'slept successfully: success' },
          payload: { result: 'success' },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });

        expect(endTime - startTime).toBeGreaterThanOrEqual(1000);
        await mastra.stopEventEngine();
      });

      it('should execute a a sleep until step', async () => {
        const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
        const step1 = createStep({
          id: 'step1',
          execute,
          inputSchema: z.object({}),
          outputSchema: z.object({ result: z.string() }),
        });
        const step2 = createStep({
          id: 'step2',
          execute: async ({ inputData }) => {
            return { result: 'slept successfully: ' + inputData.result };
          },
          inputSchema: z.object({ result: z.string() }),
          outputSchema: z.object({ result: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({
            result: z.string(),
          }),
          steps: [step1],
        });

        workflow
          .then(step1)
          .sleepUntil(new Date(Date.now() + 1000))
          .then(step2)
          .commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const startTime = Date.now();
        const result = await run.start({ inputData: {} });
        const endTime = Date.now();

        expect(execute).toHaveBeenCalled();
        expect(result.steps['step1']).toEqual({
          status: 'success',
          output: { result: 'success' },
          payload: {},
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });

        expect(result.steps['step2']).toEqual({
          status: 'success',
          output: { result: 'slept successfully: success' },
          payload: { result: 'success' },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });

        expect(endTime - startTime).toBeGreaterThan(900);
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('abort', () => {
      it('should be able to abort workflow execution in between steps', async () => {
        const step1 = createStep({
          id: 'step1',
          execute: async ({ inputData }) => {
            return { result: 'step1: ' + inputData.value };
          },
          inputSchema: z.object({ value: z.string() }),
          outputSchema: z.object({ result: z.string() }),
        });
        const step2 = createStep({
          id: 'step2',
          execute: async ({ inputData }) => {
            return { result: 'step2: ' + inputData.result };
          },
          inputSchema: z.object({ result: z.string() }),
          outputSchema: z.object({ result: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({
            result: z.string(),
          }),
          steps: [step1, step2],
        });

        workflow.then(step1).sleep(10000).then(step2).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const p = run.start({ inputData: { value: 'test' } });

        setTimeout(() => {
          run.cancel();
        }, 3000);

        const result = await p;

        expect(result.status).toBe('canceled');
        expect(result.steps['step1']).toEqual({
          status: 'success',
          output: { result: 'step1: test' },
          payload: { value: 'test' },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });

        expect(result.steps['step2']).toBeUndefined();
        await mastra.stopEventEngine();
      });

      it('should be able to abort workflow execution immediately', async () => {
        const step1 = createStep({
          id: 'step1',
          execute: async ({ inputData }) => {
            return { result: 'step1: ' + inputData.value };
          },
          inputSchema: z.object({ value: z.string() }),
          outputSchema: z.object({ result: z.string() }),
        });
        const step2 = createStep({
          id: 'step2',
          execute: async ({ inputData }) => {
            await new Promise(resolve => setTimeout(resolve, 3000));
            return { result: 'step2: ' + inputData.result };
          },
          inputSchema: z.object({ result: z.string() }),
          outputSchema: z.object({ result: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({
            result: z.string(),
          }),
          steps: [step1, step2],
        });

        workflow.then(step1).then(step2).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const p = run.start({ inputData: { value: 'test' } });

        await new Promise(resolve => setTimeout(resolve, 1000));
        await run.cancel();

        const result = await p;

        expect(result.status).toBe('canceled');
        expect(result.steps['step1']).toEqual({
          status: 'success',
          output: { result: 'step1: test' },
          payload: { value: 'test' },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });

        expect(result.steps['step2']).toBeUndefined();
        await mastra.stopEventEngine();
      });

      it('should be able to abort workflow execution during a step', async () => {
        const step1 = createStep({
          id: 'step1',
          execute: async ({ inputData }) => {
            return { result: 'step1: ' + inputData.value };
          },
          inputSchema: z.object({ value: z.string() }),
          outputSchema: z.object({ result: z.string() }),
        });
        const step2 = createStep({
          id: 'step2',
          execute: async ({ inputData, abortSignal, abort }) => {
            const timeout: Promise<string> = new Promise((resolve, _reject) => {
              const ref = setTimeout(() => {
                resolve('step2: ' + inputData.result);
              }, 10000);

              abortSignal.addEventListener('abort', () => {
                resolve('');
                clearTimeout(ref);
              });
            });

            const result = await timeout;
            if (abortSignal.aborted) {
              return abort();
            }
            return { result };
          },
          inputSchema: z.object({ result: z.string() }),
          outputSchema: z.object({ result: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({
            result: z.string(),
          }),
          steps: [step1, step2],
        });

        workflow.then(step1).then(step2).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const p = run.start({ inputData: { value: 'test' } });

        setTimeout(() => {
          run.cancel();
        }, 3000);

        const result = await p;

        expect(result.status).toBe('canceled');
        expect(result.steps['step1']).toEqual({
          status: 'success',
          output: { result: 'step1: test' },
          payload: { value: 'test' },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });

        // expect(result.steps['step2']).toEqual({
        //   status: 'success',
        //   payload: { result: 'step1: test' },
        //   output: undefined,
        //   startedAt: expect.any(Number),
        //   endedAt: expect.any(Number),
        // });
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('Error Handling', () => {
      it('should handle step execution errors', async () => {
        const error = new Error('Step execution failed');
        const failingAction = vi.fn<any>().mockImplementation(() => {
          throw error;
        });

        const step1 = createStep({
          id: 'step1',
          execute: failingAction,
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        workflow.then(step1).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();

        const result = await run.start({ inputData: {} });

        expect(result.status).toBe('failed'); // Assert status first

        // Type guard for result.error
        if (result.status === 'failed') {
          // This check helps TypeScript narrow down the type of 'result'
          expect(result.error).toMatch(/^Error: Step execution failed/); // Now safe to access
        } else {
          // This case should not be reached in this specific test.
          // If it is, the test should fail clearly.
          throw new Error("Assertion failed: workflow status was not 'failed' as expected.");
        }

        expect(result.steps?.input).toEqual({});
        const step1Result = result.steps?.step1;
        expect(step1Result).toBeDefined();
        expect(step1Result).toMatchObject({
          status: 'failed',
          payload: {},
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        expect((step1Result as any)?.error).toMatch(/^Error: Step execution failed/); // Check message prefix
        await mastra.stopEventEngine();
      });

      it('should handle variable resolution errors', async () => {
        const step1 = createStep({
          id: 'step1',
          execute: vi.fn<any>().mockResolvedValue({ data: 'success' }),
          inputSchema: z.object({}),
          outputSchema: z.object({ data: z.string() }),
        });
        const step2 = createStep({
          id: 'step2',
          execute: vi.fn<any>(),
          inputSchema: z.object({ data: z.string() }),
          outputSchema: z.object({}),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        workflow
          .then(step1)
          .map({
            data: { step: step1, path: 'data' },
          })
          .then(step2)
          .commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        await expect(run.start({ inputData: {} })).resolves.toMatchObject({
          steps: {
            step1: {
              status: 'success',
              output: {
                data: 'success',
              },
              payload: {},
              startedAt: expect.any(Number),
              endedAt: expect.any(Number),
            },
            step2: {
              status: 'success',
              payload: {
                data: 'success',
              },
              startedAt: expect.any(Number),
              endedAt: expect.any(Number),
            },
          },
        });
        await mastra.stopEventEngine();
      });

      it('should handle step execution errors within branches', async () => {
        const error = new Error('Step execution failed');
        const failingAction = async () => {
          await new Promise(resolve => setTimeout(resolve, 2e3));
          throw error;
        };

        const successAction = vi.fn<any>().mockResolvedValue({});

        const step1 = createStep({
          id: 'step1',
          execute: successAction,
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        const step2 = createStep({
          id: 'step2',
          execute: failingAction,
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        const step3 = createStep({
          id: 'step3',
          execute: successAction,
          inputSchema: z.object({
            step1: z.object({}),
            step2: z.object({}),
          }),
          outputSchema: z.object({}),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        workflow.parallel([step1, step2]).then(step3).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const result = await run.start({ inputData: {} });

        expect(result.steps).toMatchObject({
          step1: {
            status: 'success',
            payload: {},
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
          step2: {
            status: 'failed',
            // error: error?.stack ?? error, // Removed this line
            payload: {},
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
        });
        expect((result.steps?.step2 as any)?.error).toMatch(/^Error: Step execution failed/);
        await mastra.stopEventEngine();
      });

      it('should handle step execution errors within nested workflows', async () => {
        const error = new Error('Step execution failed');
        const failingAction = vi.fn<any>().mockImplementation(() => {
          throw error;
        });

        const successAction = vi.fn<any>().mockResolvedValue({});

        const step1 = createStep({
          id: 'step1',
          execute: successAction,
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        const step2 = createStep({
          id: 'step2',
          execute: failingAction,
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        const step3 = createStep({
          id: 'step3',
          execute: successAction,
          inputSchema: z.object({
            step1: z.object({}),
            step2: z.object({}),
          }),
          outputSchema: z.object({}),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        workflow.parallel([step1, step2]).then(step3).commit();

        const mainWorkflow = createWorkflow({
          id: 'main-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        })
          .then(workflow)
          .commit();

        const mastra = new Mastra({
          workflows: { 'main-workflow': mainWorkflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await mainWorkflow.createRun();
        const result = await run.start({ inputData: {} });

        expect(result.steps).toMatchObject({
          'test-workflow': {
            status: 'failed',
            // error: error?.stack ?? error, // Removed this line
            payload: {},
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
        });

        expect((result.steps?.['test-workflow'] as any)?.error).toMatch(/^Error: Step execution failed/);
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('Complex Conditions', () => {
      it('should handle nested AND/OR conditions', async () => {
        const step1Action = vi.fn<any>().mockResolvedValue({
          status: 'partial',
          score: 75,
          flags: { isValid: true },
        });
        const step2Action = vi.fn<any>().mockResolvedValue({ result: 'step2' });
        const step3Action = vi.fn<any>().mockResolvedValue({ result: 'step3' });

        const step1 = createStep({
          id: 'step1',
          execute: step1Action,
          inputSchema: z.object({}),
          outputSchema: z.object({
            status: z.string(),
            score: z.number(),
            flags: z.object({ isValid: z.boolean() }),
          }),
        });
        const step2 = createStep({
          id: 'step2',
          execute: step2Action,
          inputSchema: z.object({
            status: z.string(),
            score: z.number(),
            flags: z.object({ isValid: z.boolean() }),
          }),
          outputSchema: z.object({ result: z.string() }),
        });
        const step3 = createStep({
          id: 'step3',
          execute: step3Action,
          inputSchema: z.object({
            result: z.string(),
          }),
          outputSchema: z.object({ result: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        workflow
          .then(step1)
          .branch([
            [
              async ({ getStepResult }) => {
                const step1Result = getStepResult(step1);
                return (
                  step1Result?.status === 'success' || (step1Result?.status === 'partial' && step1Result?.score >= 70)
                );
              },
              step2,
            ],
          ])
          .map({
            result: {
              step: step2,
              path: 'result',
            },
          })
          .branch([
            [
              async ({ inputData, getStepResult }) => {
                const step1Result = getStepResult(step1);
                return !inputData.result || step1Result?.score < 70;
              },
              step3,
            ],
          ])
          .map({
            result: {
              step: step3,
              path: 'result',
            },
          })
          .commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const result = await run.start({ inputData: {} });

        expect(step2Action).toHaveBeenCalled();
        expect(step3Action).not.toHaveBeenCalled();
        expect(result.steps.step2).toEqual({
          status: 'success',
          output: { result: 'step2' },
          payload: {
            status: 'partial',
            score: 75,
            flags: { isValid: true },
          },

          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('Loops', () => {
      it('should run an until loop', async () => {
        const increment = vi.fn().mockImplementation(async ({ inputData }) => {
          // Get the current value (either from trigger or previous increment)
          const currentValue = inputData.value;

          // Increment the value
          const newValue = currentValue + 1;

          return { value: newValue };
        });
        const incrementStep = createStep({
          id: 'increment',
          description: 'Increments the current value by 1',
          inputSchema: z.object({
            value: z.number(),
            target: z.number(),
          }),
          outputSchema: z.object({
            value: z.number(),
          }),
          execute: increment,
        });

        const final = vi.fn().mockImplementation(async ({ inputData }) => {
          return { finalValue: inputData?.value };
        });
        const finalStep = createStep({
          id: 'final',
          description: 'Final step that prints the result',
          inputSchema: z.object({
            value: z.number(),
          }),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
          execute: final,
        });

        const counterWorkflow = createWorkflow({
          steps: [incrementStep, finalStep],
          id: 'counter-workflow',
          inputSchema: z.object({
            target: z.number(),
            value: z.number(),
          }),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
        });

        counterWorkflow
          .dountil(incrementStep, async ({ inputData }) => {
            return (inputData?.value ?? 0) >= 12;
          })
          .then(finalStep)
          .commit();

        const mastra = new Mastra({
          workflows: { 'counter-workflow': counterWorkflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await counterWorkflow.createRun();
        const result = await run.start({ inputData: { target: 10, value: 0 } });

        expect(increment).toHaveBeenCalledTimes(12);
        expect(final).toHaveBeenCalledTimes(1);
        // @ts-ignore
        expect(result.result).toEqual({ finalValue: 12 });
        // @ts-ignore
        expect(result.steps.increment.output).toEqual({ value: 12 });
        await mastra.stopEventEngine();
      });

      it('should run a while loop', async () => {
        const increment = vi.fn().mockImplementation(async ({ inputData }) => {
          // Get the current value (either from trigger or previous increment)
          const currentValue = inputData.value;

          // Increment the value
          const newValue = currentValue + 1;

          return { value: newValue };
        });
        const incrementStep = createStep({
          id: 'increment',
          description: 'Increments the current value by 1',
          inputSchema: z.object({
            value: z.number(),
            target: z.number(),
          }),
          outputSchema: z.object({
            value: z.number(),
          }),
          execute: increment,
        });

        const final = vi.fn().mockImplementation(async ({ inputData }) => {
          return { finalValue: inputData?.value };
        });
        const finalStep = createStep({
          id: 'final',
          description: 'Final step that prints the result',
          inputSchema: z.object({
            value: z.number(),
          }),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
          execute: final,
        });

        const counterWorkflow = createWorkflow({
          steps: [incrementStep, finalStep],
          id: 'counter-workflow',
          inputSchema: z.object({
            target: z.number(),
            value: z.number(),
          }),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
        });

        counterWorkflow
          .dowhile(incrementStep, async ({ inputData }) => {
            return (inputData?.value ?? 0) < 12;
          })
          .then(finalStep)
          .commit();

        const mastra = new Mastra({
          workflows: { 'counter-workflow': counterWorkflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await counterWorkflow.createRun();
        const result = await run.start({ inputData: { target: 10, value: 0 } });

        expect(increment).toHaveBeenCalledTimes(12);
        expect(final).toHaveBeenCalledTimes(1);
        // @ts-ignore
        expect(result.result).toEqual({ finalValue: 12 });
        // @ts-ignore
        expect(result.steps.increment.output).toEqual({ value: 12 });
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('foreach', () => {
      it('should run a single item concurrency (default) for loop', async () => {
        const startTime = Date.now();
        const map = vi.fn().mockImplementation(async ({ inputData }) => {
          await new Promise(resolve => setTimeout(resolve, 1e3));
          return { value: inputData.value + 11 };
        });
        const mapStep = createStep({
          id: 'map',
          description: 'Maps (+11) on the current value',
          inputSchema: z.object({
            value: z.number(),
          }),
          outputSchema: z.object({
            value: z.number(),
          }),
          execute: map,
        });

        const finalStep = createStep({
          id: 'final',
          description: 'Final step that prints the result',
          inputSchema: z.array(z.object({ value: z.number() })),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
          execute: async ({ inputData }) => {
            return { finalValue: inputData.reduce((acc, curr) => acc + curr.value, 0) };
          },
        });

        const counterWorkflow = createWorkflow({
          steps: [mapStep, finalStep],
          id: 'counter-workflow',
          inputSchema: z.array(z.object({ value: z.number() })),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
        });

        counterWorkflow.foreach(mapStep).then(finalStep).commit();

        const mastra = new Mastra({
          workflows: { 'counter-workflow': counterWorkflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await counterWorkflow.createRun();
        const result = await run.start({ inputData: [{ value: 1 }, { value: 22 }, { value: 333 }] });

        const endTime = Date.now();
        const duration = endTime - startTime;
        expect(duration).toBeGreaterThan(3e3 - 200);

        expect(map).toHaveBeenCalledTimes(3);
        expect(result.steps).toEqual({
          input: [{ value: 1 }, { value: 22 }, { value: 333 }],
          map: {
            status: 'success',
            output: [{ value: 12 }, { value: 33 }, { value: 344 }],
            payload: [{ value: 1 }, { value: 22 }, { value: 333 }],
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
          final: {
            status: 'success',
            output: { finalValue: 1 + 11 + (22 + 11) + (333 + 11) },
            payload: [{ value: 12 }, { value: 33 }, { value: 344 }],
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
        });
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('if-else branching', () => {
      it('should run the if-then branch', async () => {
        const start = vi.fn().mockImplementation(async ({ inputData }) => {
          // Get the current value (either from trigger or previous increment)

          // Increment the value
          const newValue = (inputData?.startValue ?? 0) + 1;

          return { newValue };
        });
        const startStep = createStep({
          id: 'start',
          description: 'Increments the current value by 1',
          inputSchema: z.object({
            startValue: z.number(),
          }),
          outputSchema: z.object({
            newValue: z.number(),
          }),
          execute: start,
        });

        const other = vi.fn().mockImplementation(async () => {
          return { other: 26 };
        });
        const otherStep = createStep({
          id: 'other',
          description: 'Other step',
          inputSchema: z.object({ newValue: z.number() }),
          outputSchema: z.object({
            other: z.number(),
          }),
          execute: other,
        });

        const final = vi.fn().mockImplementation(async ({ getStepResult }) => {
          const startVal = getStepResult(startStep)?.newValue ?? 0;
          const otherVal = getStepResult(otherStep)?.other ?? 0;
          return { finalValue: startVal + otherVal };
        });
        const finalIf = createStep({
          id: 'finalIf',
          description: 'Final step that prints the result',
          inputSchema: z.object({ newValue: z.number() }),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
          execute: final,
        });
        const finalElse = createStep({
          id: 'finalElse',
          description: 'Final step that prints the result',
          inputSchema: z.object({ other: z.number() }),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
          execute: final,
        });

        const counterWorkflow = createWorkflow({
          id: 'counter-workflow',
          inputSchema: z.object({
            startValue: z.number(),
          }),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
          steps: [startStep, finalIf],
        });

        const elseBranch = createWorkflow({
          id: 'else-branch',
          inputSchema: z.object({ newValue: z.number() }),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
          steps: [otherStep, finalElse],
        })
          .then(otherStep)
          .then(finalElse)
          .commit();

        counterWorkflow
          .then(startStep)
          .branch([
            [
              async ({ inputData }) => {
                const current = inputData.newValue;
                return !current || current < 5;
              },
              finalIf,
            ],
            [
              async ({ inputData }) => {
                const current = inputData.newValue;
                return current >= 5;
              },
              elseBranch,
            ],
          ])
          .commit();

        const mastra = new Mastra({
          workflows: { 'counter-workflow': counterWorkflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await counterWorkflow.createRun();
        const result = await run.start({ inputData: { startValue: 1 } });

        expect(start).toHaveBeenCalledTimes(1);
        expect(other).toHaveBeenCalledTimes(0);
        expect(final).toHaveBeenCalledTimes(1);
        // @ts-ignore
        expect(result.steps.finalIf.output).toEqual({ finalValue: 2 });
        // @ts-ignore
        expect(result.steps.start.output).toEqual({ newValue: 2 });
        await mastra.stopEventEngine();
      });

      it('should run the else branch', async () => {
        const start = vi.fn().mockImplementation(async ({ inputData }) => {
          // Get the current value (either from trigger or previous increment)

          // Increment the value
          const newValue = (inputData?.startValue ?? 0) + 1;

          return { newValue };
        });
        const startStep = createStep({
          id: 'start',
          description: 'Increments the current value by 1',
          inputSchema: z.object({
            startValue: z.number(),
          }),
          outputSchema: z.object({
            newValue: z.number(),
          }),
          execute: start,
        });

        const other = vi.fn().mockImplementation(async ({ inputData }) => {
          return { newValue: inputData.newValue, other: 26 };
        });
        const otherStep = createStep({
          id: 'other',
          description: 'Other step',
          inputSchema: z.object({ newValue: z.number() }),
          outputSchema: z.object({
            other: z.number(),
            newValue: z.number(),
          }),
          execute: other,
        });

        const final = vi.fn().mockImplementation(async ({ inputData }) => {
          const startVal = inputData?.newValue ?? 0;
          const otherVal = inputData?.other ?? 0;
          return { finalValue: startVal + otherVal };
        });
        const finalIf = createStep({
          id: 'finalIf',
          description: 'Final step that prints the result',
          inputSchema: z.object({ newValue: z.number() }),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
          execute: final,
        });
        const finalElse = createStep({
          id: 'finalElse',
          description: 'Final step that prints the result',
          inputSchema: z.object({ other: z.number(), newValue: z.number() }),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
          execute: final,
        });

        const counterWorkflow = createWorkflow({
          id: 'counter-workflow',
          inputSchema: z.object({
            startValue: z.number(),
          }),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
          steps: [startStep, finalIf],
        });

        const elseBranch = createWorkflow({
          id: 'else-branch',
          inputSchema: z.object({ newValue: z.number() }),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
          steps: [otherStep, finalElse],
        })
          .then(otherStep)
          .then(finalElse)
          .commit();

        counterWorkflow
          .then(startStep)
          .branch([
            [
              async ({ inputData }) => {
                const current = inputData.newValue;
                return !current || current < 5;
              },
              finalIf,
            ],
            [
              async ({ inputData }) => {
                const current = inputData.newValue;
                return current >= 5;
              },
              elseBranch,
            ],
          ])
          .commit();

        const mastra = new Mastra({
          workflows: { 'counter-workflow': counterWorkflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await counterWorkflow.createRun();
        const result = await run.start({ inputData: { startValue: 6 } });

        expect(start).toHaveBeenCalledTimes(1);
        expect(other).toHaveBeenCalledTimes(1);
        expect(final).toHaveBeenCalledTimes(1);
        // @ts-ignore
        expect(result.steps['else-branch'].output).toEqual({ finalValue: 26 + 6 + 1 });
        // @ts-ignore
        expect(result.steps.start.output).toEqual({ newValue: 7 });
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('multiple chains', () => {
      it('should run multiple chains in parallel', async () => {
        const step1 = createStep({
          id: 'step1',
          execute: vi.fn<any>().mockResolvedValue({ result: 'success1' }),
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });
        const step2 = createStep({
          id: 'step2',
          execute: vi.fn<any>().mockResolvedValue({ result: 'success2' }),
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });
        const step3 = createStep({
          id: 'step3',
          execute: vi.fn<any>().mockResolvedValue({ result: 'success3' }),
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });
        const step4 = createStep({
          id: 'step4',
          execute: vi.fn<any>().mockResolvedValue({ result: 'success4' }),
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });
        const step5 = createStep({
          id: 'step5',
          execute: vi.fn<any>().mockResolvedValue({ result: 'success5' }),
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
          steps: [step1, step2, step3, step4, step5],
        });
        workflow
          .parallel([
            createWorkflow({
              id: 'nested-a',
              inputSchema: z.object({}),
              outputSchema: z.object({}),
              steps: [step1, step2, step3],
            })
              .then(step1)
              .then(step2)
              .then(step3)
              .commit(),
            createWorkflow({
              id: 'nested-b',
              inputSchema: z.object({}),
              outputSchema: z.object({}),
              steps: [step4, step5],
            })
              .then(step4)
              .then(step5)
              .commit(),
          ])
          .commit();

        const mastra = new Mastra({
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
          workflows: {
            'test-workflow': workflow,
          },
          storage: testStorage,
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const result = await run.start({ inputData: {} });

        expect(result.steps['nested-a']).toEqual({
          status: 'success',
          output: { result: 'success3' },
          payload: {},
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        expect(result.steps['nested-b']).toEqual({
          status: 'success',
          output: { result: 'success5' },
          payload: {},
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('Retry', () => {
      it('should retry a step default 0 times', async () => {
        let err: Error | undefined;
        const step1 = createStep({
          id: 'step1',
          execute: vi.fn<any>().mockResolvedValue({ result: 'success' }),
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });
        const step2 = createStep({
          id: 'step2',
          execute: vi.fn<any>().mockImplementation(() => {
            err = new Error('Step failed');
            throw err;
          }),
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        const mastra = new Mastra({
          logger: false,
          workflows: {
            'test-workflow': workflow,
          },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        workflow.then(step1).then(step2).commit();

        const run = await workflow.createRun();
        const result = await run.start({ inputData: {} });

        expect(result.steps.step1).toEqual({
          status: 'success',
          output: { result: 'success' },
          payload: {},
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        expect(result.steps.step2).toMatchObject({
          // Change to toMatchObject
          status: 'failed',
          // error: err?.stack ?? err, // REMOVE THIS LINE
          payload: { result: 'success' },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        // ADD THIS SEPARATE ASSERTION
        expect((result.steps.step2 as any)?.error).toMatch(/^Error: Step failed/);
        expect(step1.execute).toHaveBeenCalledTimes(1);
        expect(step2.execute).toHaveBeenCalledTimes(1); // 0 retries + 1 initial call
        await mastra.stopEventEngine();
      });

      it('should retry a step with a custom retry config', async () => {
        let err: Error | undefined;
        const step1 = createStep({
          id: 'step1',
          execute: vi.fn<any>().mockResolvedValue({ result: 'success' }),
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });
        const step2 = createStep({
          id: 'step2',
          execute: vi.fn<any>().mockImplementation(() => {
            err = new Error('Step failed');
            throw err;
          }),
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
          retryConfig: { attempts: 5, delay: 200 },
        });

        const mastra = new Mastra({
          logger: false,
          storage: testStorage,
          workflows: {
            'test-workflow': workflow,
          },
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        workflow.then(step1).then(step2).commit();

        const run = await workflow.createRun();
        const result = await run.start({ inputData: {} });

        expect(result.steps.step1).toEqual({
          status: 'success',
          output: { result: 'success' },
          payload: {},
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        expect(result.steps.step2).toMatchObject({
          // Change to toMatchObject
          status: 'failed',
          // error: err?.stack ?? err, // REMOVE THIS LINE
          payload: { result: 'success' },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        // ADD THIS SEPARATE ASSERTION
        expect((result.steps.step2 as any)?.error).toMatch(/^Error: Step failed/);
        expect(step1.execute).toHaveBeenCalledTimes(1);
        expect(step2.execute).toHaveBeenCalledTimes(6); // 5 retries + 1 initial call
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('Interoperability (Actions)', () => {
      it('should be able to use all action types in a workflow', async () => {
        const step1Action = vi.fn<any>().mockResolvedValue({ name: 'step1' });

        const step1 = createStep({
          id: 'step1',
          execute: step1Action,
          inputSchema: z.object({}),
          outputSchema: z.object({ name: z.string() }),
        });

        // @ts-ignore
        const toolAction = vi.fn<any>().mockImplementation(async ({ context }) => {
          return { name: context.name };
        });

        const randomTool = createTool({
          id: 'random-tool',
          execute: toolAction,
          description: 'random-tool',
          inputSchema: z.object({ name: z.string() }),
          outputSchema: z.object({ name: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({ name: z.string() }),
        });

        workflow.then(step1).then(createStep(randomTool)).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const result = await run.start({ inputData: {} });

        expect(step1Action).toHaveBeenCalled();
        expect(toolAction).toHaveBeenCalled();
        // @ts-ignore
        expect(result.steps.step1).toEqual({
          status: 'success',
          output: { name: 'step1' },
          payload: {},
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        // @ts-ignore
        expect(result.steps['random-tool']).toEqual({
          status: 'success',
          output: { name: 'step1' },
          payload: { name: 'step1' },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('Suspend and Resume', () => {
      afterAll(async () => {
        const pathToDb = path.join(process.cwd(), 'mastra.db');

        if (fs.existsSync(pathToDb)) {
          fs.rmSync(pathToDb);
        }
      });
      it('should return the correct runId', async () => {
        const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
        const step1 = createStep({
          id: 'step1',
          execute,
          inputSchema: z.object({}),
          outputSchema: z.object({ result: z.string() }),
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
          steps: [step1],
        })
          .then(step1)
          .commit();
        const run = await workflow.createRun();
        const run2 = await workflow.createRun({ runId: run.runId });

        expect(run.runId).toBeDefined();
        expect(run2.runId).toBeDefined();
        expect(run.runId).toBe(run2.runId);
      });

      it('should handle basic suspend and resume flow with async await syntax', async () => {
        const getUserInputAction = vi.fn().mockResolvedValue({ userInput: 'test input' });
        const promptAgentAction = vi
          .fn()
          .mockImplementationOnce(async ({ suspend }) => {
            return suspend({ testPayload: 'hello' });
          })
          .mockImplementationOnce(() => ({ modelOutput: 'test output' }));
        const evaluateToneAction = vi.fn().mockResolvedValue({
          toneScore: { score: 0.8 },
          completenessScore: { score: 0.7 },
        });
        const improveResponseAction = vi
          .fn()
          .mockImplementationOnce(async ({ suspend }) => {
            await suspend();
            return undefined;
          })
          .mockImplementationOnce(() => ({ improvedOutput: 'improved output' }));
        const evaluateImprovedAction = vi.fn().mockResolvedValue({
          toneScore: { score: 0.9 },
          completenessScore: { score: 0.8 },
        });

        const getUserInput = createStep({
          id: 'getUserInput',
          execute: getUserInputAction,
          inputSchema: z.object({ input: z.string() }),
          outputSchema: z.object({ userInput: z.string() }),
        });
        const promptAgent = createStep({
          id: 'promptAgent',
          execute: promptAgentAction,
          inputSchema: z.object({ userInput: z.string() }),
          outputSchema: z.object({ modelOutput: z.string() }),
          suspendSchema: z.object({ testPayload: z.string() }),
          resumeSchema: z.object({ userInput: z.string() }),
        });
        const evaluateTone = createStep({
          id: 'evaluateToneConsistency',
          execute: evaluateToneAction,
          inputSchema: z.object({ modelOutput: z.string() }),
          outputSchema: z.object({
            toneScore: z.any(),
            completenessScore: z.any(),
          }),
        });
        const improveResponse = createStep({
          id: 'improveResponse',
          execute: improveResponseAction,
          resumeSchema: z.object({
            toneScore: z.object({ score: z.number() }),
            completenessScore: z.object({ score: z.number() }),
          }),
          inputSchema: z.object({ toneScore: z.any(), completenessScore: z.any() }),
          outputSchema: z.object({ improvedOutput: z.string() }),
        });
        const evaluateImproved = createStep({
          id: 'evaluateImprovedResponse',
          execute: evaluateImprovedAction,
          inputSchema: z.object({ improvedOutput: z.string() }),
          outputSchema: z.object({
            toneScore: z.any(),
            completenessScore: z.any(),
          }),
        });

        const promptEvalWorkflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({ input: z.string() }),
          outputSchema: z.object({}),
        });

        promptEvalWorkflow
          .then(getUserInput)
          .then(promptAgent)
          .then(evaluateTone)
          .then(improveResponse)
          .then(evaluateImproved)
          .commit();

        const mastra = new Mastra({
          logger: false,
          storage: testStorage,
          workflows: { 'test-workflow': promptEvalWorkflow },
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await promptEvalWorkflow.createRun();

        const initialResult = await run.start({ inputData: { input: 'test' } });
        expect(initialResult.steps.promptAgent.status).toBe('suspended');
        expect(promptAgentAction).toHaveBeenCalledTimes(1);
        // expect(initialResult.activePaths.size).toBe(1);
        // expect(initialResult.activePaths.get('promptAgent')?.status).toBe('suspended');
        // expect(initialResult.activePaths.get('promptAgent')?.suspendPayload).toEqual({ testPayload: 'hello' });
        expect(initialResult.steps).toEqual({
          input: { input: 'test' },
          getUserInput: {
            status: 'success',
            output: { userInput: 'test input' },
            payload: { input: 'test' },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
          promptAgent: {
            status: 'suspended',
            payload: { userInput: 'test input' },
            suspendPayload: {
              testPayload: 'hello',
              __workflow_meta: {
                path: ['promptAgent'],
                runId: expect.any(String),
              },
            },
            startedAt: expect.any(Number),
            suspendedAt: expect.any(Number),
          },
        });

        const newCtx = {
          userInput: 'test input for resumption',
        };

        expect(initialResult.steps.promptAgent.status).toBe('suspended');
        expect(promptAgentAction).toHaveBeenCalledTimes(1);

        const firstResumeResult = await run.resume({ step: 'promptAgent', resumeData: newCtx });
        if (!firstResumeResult) {
          throw new Error('Resume failed to return a result');
        }

        // expect(firstResumeResult.activePaths.size).toBe(1);
        // expect(firstResumeResult.activePaths.get('improveResponse')?.status).toBe('suspended');
        expect(firstResumeResult.steps).toEqual({
          input: { input: 'test' },
          getUserInput: {
            status: 'success',
            output: { userInput: 'test input' },
            payload: { input: 'test' },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
          promptAgent: {
            status: 'success',
            output: { modelOutput: 'test output' },
            payload: { userInput: 'test input' },
            suspendPayload: { testPayload: 'hello' },
            resumePayload: { userInput: 'test input for resumption' },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
            suspendedAt: expect.any(Number),
            resumedAt: expect.any(Number),
          },
          evaluateToneConsistency: {
            status: 'success',
            output: {
              toneScore: { score: 0.8 },
              completenessScore: { score: 0.7 },
            },
            payload: { modelOutput: 'test output' },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
          improveResponse: {
            status: 'suspended',
            payload: {
              toneScore: { score: 0.8 },
              completenessScore: { score: 0.7 },
            },
            suspendPayload: {
              __workflow_meta: {
                path: ['improveResponse'],
                runId: expect.any(String),
              },
            },
            startedAt: expect.any(Number),
            suspendedAt: expect.any(Number),
          },
        });

        const secondResumeResult = await run.resume({
          step: improveResponse,
          resumeData: {
            toneScore: { score: 0.8 },
            completenessScore: { score: 0.7 },
          },
        });
        if (!secondResumeResult) {
          throw new Error('Resume failed to return a result');
        }

        expect(promptAgentAction).toHaveBeenCalledTimes(2);

        expect(secondResumeResult.steps).toEqual({
          input: { input: 'test' },
          getUserInput: {
            status: 'success',
            output: { userInput: 'test input' },
            payload: { input: 'test' },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
          promptAgent: {
            status: 'success',
            output: { modelOutput: 'test output' },
            payload: { userInput: 'test input' },
            suspendPayload: { testPayload: 'hello' },
            resumePayload: { userInput: 'test input for resumption' },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
            suspendedAt: expect.any(Number),
            resumedAt: expect.any(Number),
          },
          evaluateToneConsistency: {
            status: 'success',
            output: {
              toneScore: { score: 0.8 },
              completenessScore: { score: 0.7 },
            },
            payload: { modelOutput: 'test output' },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
          improveResponse: {
            status: 'success',
            output: { improvedOutput: 'improved output' },
            payload: { toneScore: { score: 0.8 }, completenessScore: { score: 0.7 } },
            resumePayload: {
              toneScore: { score: 0.8 },
              completenessScore: { score: 0.7 },
            },
            suspendPayload: {},
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
            suspendedAt: expect.any(Number),
            resumedAt: expect.any(Number),
          },
          evaluateImprovedResponse: {
            status: 'success',
            output: { toneScore: { score: 0.9 }, completenessScore: { score: 0.8 } },
            payload: { improvedOutput: 'improved output' },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          },
        });

        expect(promptAgentAction).toHaveBeenCalledTimes(2);
        await mastra.stopEventEngine();
      });

      it('should work with requestContext - bug #4442', async () => {
        const getUserInputAction = vi.fn().mockResolvedValue({ userInput: 'test input' });
        const promptAgentAction = vi.fn().mockImplementation(async ({ suspend, requestContext, resumeData }) => {
          if (!resumeData) {
            requestContext.set('responses', [...(requestContext.get('responses') ?? []), 'first message']);
            return await suspend({ testPayload: 'hello' });
          }

          requestContext.set('responses', [...(requestContext.get('responses') ?? []), 'promptAgentAction']);

          return undefined;
        });
        const requestContextAction = vi.fn().mockImplementation(async ({ requestContext }) => {
          return requestContext.get('responses');
        });

        const getUserInput = createStep({
          id: 'getUserInput',
          execute: getUserInputAction,
          inputSchema: z.object({ input: z.string() }),
          outputSchema: z.object({ userInput: z.string() }),
        });
        const promptAgent = createStep({
          id: 'promptAgent',
          execute: promptAgentAction,
          inputSchema: z.object({ userInput: z.string() }),
          outputSchema: z.object({ modelOutput: z.string() }),
          suspendSchema: z.object({ testPayload: z.string() }),
          resumeSchema: z.object({ userInput: z.string() }),
        });
        const requestContextStep = createStep({
          id: 'requestContextAction',
          execute: requestContextAction,
          inputSchema: z.object({ modelOutput: z.string() }),
          outputSchema: z.array(z.string()),
        });

        const promptEvalWorkflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({ input: z.string() }),
          outputSchema: z.object({}),
        });

        promptEvalWorkflow.then(getUserInput).then(promptAgent).then(requestContextStep).commit();

        const mastra = new Mastra({
          logger: false,
          storage: testStorage,
          workflows: { 'test-workflow': promptEvalWorkflow },
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await promptEvalWorkflow.createRun();

        const initialResult = await run.start({ inputData: { input: 'test' } });
        expect(initialResult.steps.promptAgent.status).toBe('suspended');
        expect(promptAgentAction).toHaveBeenCalledTimes(1);

        const newCtx = {
          userInput: 'test input for resumption',
        };

        const firstResumeResult = await run.resume({ step: 'promptAgent', resumeData: newCtx });
        expect(promptAgentAction).toHaveBeenCalledTimes(2);
        expect(firstResumeResult.steps.requestContextAction.status).toBe('success');
        // @ts-ignore
        expect(firstResumeResult.steps.requestContextAction.output).toEqual(['first message', 'promptAgentAction']);
        await mastra.stopEventEngine();
      });

      it('should work with custom requestContext - bug #4442', async () => {
        const getUserInputAction = vi.fn().mockResolvedValue({ userInput: 'test input' });
        const promptAgentAction = vi.fn().mockImplementation(async ({ suspend, requestContext, resumeData }) => {
          if (!resumeData) {
            requestContext.set('responses', [...(requestContext.get('responses') ?? []), 'first message']);
            return await suspend({ testPayload: 'hello' });
          }

          requestContext.set('responses', [...(requestContext.get('responses') ?? []), 'promptAgentAction']);

          return undefined;
        });
        const requestContextAction = vi.fn().mockImplementation(async ({ requestContext }) => {
          return requestContext.get('responses');
        });

        const getUserInput = createStep({
          id: 'getUserInput',
          execute: getUserInputAction,
          inputSchema: z.object({ input: z.string() }),
          outputSchema: z.object({ userInput: z.string() }),
        });
        const promptAgent = createStep({
          id: 'promptAgent',
          execute: promptAgentAction,
          inputSchema: z.object({ userInput: z.string() }),
          outputSchema: z.object({ modelOutput: z.string() }),
          suspendSchema: z.object({ testPayload: z.string() }),
          resumeSchema: z.object({ userInput: z.string() }),
        });
        const requestContextStep = createStep({
          id: 'requestContextAction',
          execute: requestContextAction,
          inputSchema: z.object({ modelOutput: z.string() }),
          outputSchema: z.array(z.string()),
        });

        const promptEvalWorkflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({ input: z.string() }),
          outputSchema: z.object({}),
        });

        promptEvalWorkflow.then(getUserInput).then(promptAgent).then(requestContextStep).commit();

        const mastra = new Mastra({
          logger: false,
          storage: testStorage,
          workflows: { 'test-workflow': promptEvalWorkflow },
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await promptEvalWorkflow.createRun();

        const requestContext = new RequestContext();
        const initialResult = await run.start({ inputData: { input: 'test' }, requestContext });
        expect(initialResult.steps.promptAgent.status).toBe('suspended');
        expect(promptAgentAction).toHaveBeenCalledTimes(1);
        // NOTE: this won't work with evented systems, the map isn't shared
        // expect(requestContext.get('responses')).toEqual(['first message']);

        const newCtx = {
          userInput: 'test input for resumption',
        };

        const firstResumeResult = await run.resume({ step: 'promptAgent', resumeData: newCtx, requestContext });
        expect(promptAgentAction).toHaveBeenCalledTimes(2);
        expect(firstResumeResult.steps.requestContextAction.status).toBe('success');
        // @ts-ignore
        expect(firstResumeResult.steps.requestContextAction.output).toEqual(['first message', 'promptAgentAction']);
        await mastra.stopEventEngine();
      });

      it('should handle basic suspend and resume in a dountil workflow', async () => {
        const resumeStep = createStep({
          id: 'resume',
          inputSchema: z.object({ value: z.number() }),
          outputSchema: z.object({ value: z.number() }),
          resumeSchema: z.object({ value: z.number() }),
          suspendSchema: z.object({ message: z.string() }),
          execute: async ({ inputData, resumeData, suspend }) => {
            console.info('inputData is ', inputData);
            console.info('resumeData is ', resumeData);

            const finalValue = (resumeData?.value ?? 0) + inputData.value;

            if (!resumeData?.value || finalValue < 10) {
              return await suspend({
                message: `Please provide additional information. now value is ${inputData.value}`,
              });
            }

            return { value: finalValue };
          },
        });

        const incrementStep = createStep({
          id: 'increment',
          inputSchema: z.object({
            value: z.number(),
          }),
          outputSchema: z.object({
            value: z.number(),
          }),
          execute: async ({ inputData }) => {
            return {
              value: inputData.value + 1,
            };
          },
        });

        const dowhileWorkflow = createWorkflow({
          id: 'dowhile-workflow',
          inputSchema: z.object({ value: z.number() }),
          outputSchema: z.object({ value: z.number() }),
        })
          .dountil(
            createWorkflow({
              id: 'simple-resume-workflow',
              inputSchema: z.object({ value: z.number() }),
              outputSchema: z.object({ value: z.number() }),
              steps: [incrementStep, resumeStep],
            })
              .then(incrementStep)
              .then(resumeStep)
              .commit(),
            async ({ inputData }) => inputData.value >= 10,
          )
          .then(
            createStep({
              id: 'final',
              inputSchema: z.object({ value: z.number() }),
              outputSchema: z.object({ value: z.number() }),
              execute: async ({ inputData }) => ({ value: inputData.value }),
            }),
          )
          .commit();

        const mastra = new Mastra({
          logger: false,
          storage: testStorage,
          workflows: { 'dowhile-workflow': dowhileWorkflow },
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await dowhileWorkflow.createRun();
        const result = await run.start({ inputData: { value: 0 } });
        expect(result.steps['simple-resume-workflow']).toMatchObject({
          status: 'suspended',
        });

        const resumeResult = await run.resume({
          resumeData: { value: 2 },
          step: ['simple-resume-workflow', 'resume'],
        });

        expect(resumeResult.steps['simple-resume-workflow']).toMatchObject({
          status: 'suspended',
        });

        const lastResumeResult = await run.resume({
          resumeData: { value: 21 },
          step: ['simple-resume-workflow', 'resume'],
        });

        expect(lastResumeResult.steps['simple-resume-workflow']).toMatchObject({
          status: 'success',
        });
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('Workflow Runs', () => {
      let testStorage;

      beforeEach(async () => {
        testStorage = new MockStore();
      });

      it('should return empty result when mastra is not initialized', async () => {
        const workflow = createWorkflow({ id: 'test', inputSchema: z.object({}), outputSchema: z.object({}) });
        const result = await workflow.listWorkflowRuns();
        expect(result).toEqual({ runs: [], total: 0 });
      });

      it('should get workflow runs from storage', async () => {
        const step1Action = vi.fn<any>().mockResolvedValue({ result: 'success1' });
        const step2Action = vi.fn<any>().mockResolvedValue({ result: 'success2' });

        const step1 = createStep({
          id: 'step1',
          execute: step1Action,
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });
        const step2 = createStep({
          id: 'step2',
          execute: step2Action,
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        const workflow = createWorkflow({ id: 'test-workflow', inputSchema: z.object({}), outputSchema: z.object({}) });
        workflow.then(step1).then(step2).commit();

        const mastra = new Mastra({
          workflows: {
            'test-workflow': workflow,
          },
          logger: false,
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        // Create a few runs
        const run1 = await workflow.createRun();
        await run1.start({ inputData: {} });

        const run2 = await workflow.createRun();
        await run2.start({ inputData: {} });

        const { runs, total } = await workflow.listWorkflowRuns();
        expect(total).toBe(2);
        expect(runs).toHaveLength(2);
        expect(runs.map(r => r.runId)).toEqual(expect.arrayContaining([run1.runId, run2.runId]));
        expect(runs[0]?.workflowName).toBe('test-workflow');
        expect(runs[0]?.snapshot).toBeDefined();
        expect(runs[1]?.snapshot).toBeDefined();
        await mastra.stopEventEngine();
      });

      it('should get workflow run by id from storage', async () => {
        const step1Action = vi.fn<any>().mockResolvedValue({ result: 'success1' });
        const step2Action = vi.fn<any>().mockResolvedValue({ result: 'success2' });

        const step1 = createStep({
          id: 'step1',
          execute: step1Action,
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });
        const step2 = createStep({
          id: 'step2',
          execute: step2Action,
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });

        const workflow = createWorkflow({ id: 'test-workflow', inputSchema: z.object({}), outputSchema: z.object({}) });
        workflow.then(step1).then(step2).commit();

        const mastra = new Mastra({
          logger: false,
          storage: testStorage,
          workflows: {
            'test-workflow': workflow,
          },
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        // Create a few runs
        const run1 = await workflow.createRun();
        await run1.start({ inputData: {} });

        const { runs, total } = await workflow.listWorkflowRuns();
        expect(total).toBe(1);
        expect(runs).toHaveLength(1);
        expect(runs.map(r => r.runId)).toEqual(expect.arrayContaining([run1.runId]));
        expect(runs[0]?.workflowName).toBe('test-workflow');
        expect(runs[0]?.snapshot).toBeDefined();

        const run3 = await workflow.getWorkflowRunById(run1.runId);
        expect(run3?.runId).toBe(run1.runId);
        expect(run3?.workflowName).toBe('test-workflow');
        expect(run3?.snapshot).toEqual(runs[0].snapshot);
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('Agent as step', () => {
      it('should be able to use an agent as a step', async () => {
        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({
            prompt1: z.string(),
            prompt2: z.string(),
          }),
          outputSchema: z.object({}),
        });

        const agent = new Agent({
          id: 'test-agent-1',
          name: 'test-agent-1',
          instructions: 'test agent instructions',
          model: new MockLanguageModelV1({
            doStream: async () => ({
              stream: simulateReadableStream({
                chunks: [
                  { type: 'text-delta', textDelta: 'Paris' },
                  {
                    type: 'finish',
                    finishReason: 'stop',
                    logprobs: undefined,
                    usage: { completionTokens: 10, promptTokens: 3 },
                  },
                ],
              }),
              rawCall: { rawPrompt: null, rawSettings: {} },
            }),
          }),
        });

        const agent2 = new Agent({
          id: 'test-agent-2',
          name: 'test-agent-2',
          instructions: 'test agent instructions',
          model: new MockLanguageModelV1({
            doStream: async () => ({
              stream: simulateReadableStream({
                chunks: [
                  { type: 'text-delta', textDelta: 'London' },
                  {
                    type: 'finish',
                    finishReason: 'stop',
                    logprobs: undefined,
                    usage: { completionTokens: 10, promptTokens: 3 },
                  },
                ],
              }),
              rawCall: { rawPrompt: null, rawSettings: {} },
            }),
          }),
        });

        const startStep = createStep({
          id: 'start',
          inputSchema: z.object({
            prompt1: z.string(),
            prompt2: z.string(),
          }),
          outputSchema: z.object({ prompt1: z.string(), prompt2: z.string() }),
          execute: async ({ inputData }) => {
            return {
              prompt1: inputData.prompt1,
              prompt2: inputData.prompt2,
            };
          },
        });

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          agents: { 'test-agent-1': agent, 'test-agent-2': agent2 },
          logger: false,
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();
        const agentStep1 = createStep(agent);
        const agentStep2 = createStep(agent2);

        workflow
          .then(startStep)
          .map({
            prompt: {
              step: startStep,
              path: 'prompt1',
            },
          })
          .then(agentStep1)
          .map({
            prompt: {
              step: startStep,
              path: 'prompt2',
            },
          })
          .then(agentStep2)
          .commit();

        const run = await workflow.createRun();
        const result = await run.start({
          inputData: { prompt1: 'Capital of France, just the name', prompt2: 'Capital of UK, just the name' },
        });

        expect(result.steps['test-agent-1']).toEqual({
          status: 'success',
          output: { text: 'Paris' },
          payload: {
            prompt: 'Capital of France, just the name',
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });

        expect(result.steps['test-agent-2']).toEqual({
          status: 'success',
          output: { text: 'London' },
          payload: {
            prompt: 'Capital of UK, just the name',
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });

      it('should be able to use an agent in parallel', async () => {
        const execute = vi.fn<any>().mockResolvedValue({ result: 'success' });
        const finalStep = createStep({
          id: 'finalStep',
          inputSchema: z.object({
            'nested-workflow': z.object({ text: z.string() }),
            'nested-workflow-2': z.object({ text: z.string() }),
          }),
          outputSchema: z.object({
            result: z.string(),
          }),
          execute,
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({
            prompt1: z.string(),
            prompt2: z.string(),
          }),
          outputSchema: z.object({
            'nested-workflow': z.object({ text: z.string() }),
            'nested-workflow-2': z.object({ text: z.string() }),
          }),
        });

        const agent = new Agent({
          id: 'test-agent-1',
          name: 'test-agent-1',
          instructions: 'test agent instructions',
          model: new MockLanguageModelV1({
            doStream: async () => ({
              stream: simulateReadableStream({
                chunks: [
                  { type: 'text-delta', textDelta: 'Paris' },
                  {
                    type: 'finish',
                    finishReason: 'stop',
                    logprobs: undefined,
                    usage: { completionTokens: 10, promptTokens: 3 },
                  },
                ],
              }),
              rawCall: { rawPrompt: null, rawSettings: {} },
            }),
          }),
        });

        const agent2 = new Agent({
          id: 'test-agent-2',
          name: 'test-agent-2',
          instructions: 'test agent instructions',
          model: new MockLanguageModelV1({
            doStream: async () => ({
              stream: simulateReadableStream({
                chunks: [
                  { type: 'text-delta', textDelta: 'London' },
                  {
                    type: 'finish',
                    finishReason: 'stop',
                    logprobs: undefined,
                    usage: { completionTokens: 10, promptTokens: 3 },
                  },
                ],
              }),
              rawCall: { rawPrompt: null, rawSettings: {} },
            }),
          }),
        });

        const startStep = createStep({
          id: 'start',
          inputSchema: z.object({
            prompt1: z.string(),
            prompt2: z.string(),
          }),
          outputSchema: z.object({ prompt1: z.string(), prompt2: z.string() }),
          execute: async ({ inputData }) => {
            return {
              prompt1: inputData.prompt1,
              prompt2: inputData.prompt2,
            };
          },
        });

        const mastra = new Mastra({
          logger: false,
          storage: testStorage,
          workflows: { 'test-workflow': workflow },
          agents: { 'test-agent-1': agent, 'test-agent-2': agent2 },
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const nestedWorkflow1 = createWorkflow({
          id: 'nested-workflow',
          inputSchema: z.object({ prompt1: z.string(), prompt2: z.string() }),
          outputSchema: z.object({ text: z.string() }),
        })
          .then(startStep)
          .map({
            prompt: {
              step: startStep,
              path: 'prompt1',
            },
          })
          .then(createStep(agent))
          .commit();

        const nestedWorkflow2 = createWorkflow({
          id: 'nested-workflow-2',
          inputSchema: z.object({ prompt1: z.string(), prompt2: z.string() }),
          outputSchema: z.object({ text: z.string() }),
        })
          .then(startStep)
          .map({
            prompt: {
              step: startStep,
              path: 'prompt2',
            },
          })
          .then(createStep(agent2))
          .commit();

        workflow.parallel([nestedWorkflow1, nestedWorkflow2]).then(finalStep).commit();

        const run = workflow.createRun();
        const result = await run.start({
          inputData: { prompt1: 'Capital of France, just the name', prompt2: 'Capital of UK, just the name' },
        });

        expect(execute).toHaveBeenCalledTimes(1);
        expect(result.steps['finalStep']).toEqual({
          status: 'success',
          output: { result: 'success' },
          payload: {
            'nested-workflow': {
              text: 'Paris',
            },
            'nested-workflow-2': {
              text: 'London',
            },
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });

        expect(result.steps['nested-workflow']).toEqual({
          status: 'success',
          output: { text: 'Paris' },
          payload: {
            prompt1: 'Capital of France, just the name',
            prompt2: 'Capital of UK, just the name',
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });

        expect(result.steps['nested-workflow-2']).toEqual({
          status: 'success',
          output: { text: 'London' },
          payload: {
            prompt1: 'Capital of France, just the name',
            prompt2: 'Capital of UK, just the name',
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });

      it('should be able to use an agent as a step via mastra instance', async () => {
        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({
            prompt1: z.string(),
            prompt2: z.string(),
          }),
          outputSchema: z.object({}),
        });

        const agent = new Agent({
          id: 'test-agent-1',
          name: 'test-agent-1',
          instructions: 'test agent instructions',
          model: new MockLanguageModelV1({
            doGenerate: async () => ({
              rawCall: { rawPrompt: null, rawSettings: {} },
              finishReason: 'stop',
              usage: { promptTokens: 10, completionTokens: 20 },
              text: `Paris`,
            }),
          }),
        });

        const agent2 = new Agent({
          id: 'test-agent-2',
          name: 'test-agent-2',
          instructions: 'test agent instructions',
          model: new MockLanguageModelV1({
            doGenerate: async () => ({
              rawCall: { rawPrompt: null, rawSettings: {} },
              finishReason: 'stop',
              usage: { promptTokens: 10, completionTokens: 20 },
              text: `London`,
            }),
          }),
        });

        const startStep = createStep({
          id: 'start',
          inputSchema: z.object({
            prompt1: z.string(),
            prompt2: z.string(),
          }),
          outputSchema: z.object({ prompt1: z.string(), prompt2: z.string() }),
          execute: async ({ inputData }) => {
            return {
              prompt1: inputData.prompt1,
              prompt2: inputData.prompt2,
            };
          },
        });

        const mastra = new Mastra({
          logger: false,
          storage: testStorage,
          workflows: { 'test-workflow': workflow },
          agents: { 'test-agent-1': agent, 'test-agent-2': agent2 },
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        workflow
          .then(startStep)
          .map({
            prompt: {
              step: startStep,
              path: 'prompt1',
            },
          })
          .then(
            createStep({
              id: 'agent-step-1',
              inputSchema: z.object({ prompt: z.string() }),
              outputSchema: z.object({ text: z.string() }),
              execute: async ({ inputData, mastra }) => {
                const agent = mastra.getAgent('test-agent-1');
                const result = await agent.generateLegacy([{ role: 'user', content: inputData.prompt }]);
                return { text: result.text };
              },
            }),
          )
          .map({
            prompt: {
              step: startStep,
              path: 'prompt2',
            },
          })
          .then(
            createStep({
              id: 'agent-step-2',
              inputSchema: z.object({ prompt: z.string() }),
              outputSchema: z.object({ text: z.string() }),
              execute: async ({ inputData, mastra }) => {
                const agent = mastra.getAgent('test-agent-2');
                const result = await agent.generateLegacy([{ role: 'user', content: inputData.prompt }]);
                return { text: result.text };
              },
            }),
          )

          .commit();

        const run = workflow.createRun();
        const result = await run.start({
          inputData: { prompt1: 'Capital of France, just the name', prompt2: 'Capital of UK, just the name' },
        });

        expect(result.steps['agent-step-1']).toEqual({
          status: 'success',
          output: { text: 'Paris' },
          payload: {
            prompt: 'Capital of France, just the name',
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });

        expect(result.steps['agent-step-2']).toEqual({
          status: 'success',
          output: { text: 'London' },
          payload: {
            prompt: 'Capital of UK, just the name',
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });

      it('should be able to use an agent as a step in nested workflow via mastra instance', async () => {
        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({
            prompt1: z.string(),
            prompt2: z.string(),
          }),
          outputSchema: z.object({}),
        });

        const agent = new Agent({
          id: 'test-agent-1',
          name: 'test-agent-1',
          instructions: 'test agent instructions',
          model: new MockLanguageModelV1({
            doGenerate: async () => ({
              rawCall: { rawPrompt: null, rawSettings: {} },
              finishReason: 'stop',
              usage: { promptTokens: 10, completionTokens: 20 },
              text: `Paris`,
            }),
          }),
        });
        const agent2 = new Agent({
          id: 'test-agent-2',
          name: 'test-agent-2',
          instructions: 'test agent instructions',
          model: new MockLanguageModelV1({
            doGenerate: async () => ({
              rawCall: { rawPrompt: null, rawSettings: {} },
              finishReason: 'stop',
              usage: { promptTokens: 10, completionTokens: 20 },
              text: `London`,
            }),
          }),
        });
        const mastra = new Mastra({
          logger: false,
          storage: testStorage,
          workflows: { 'test-workflow': workflow },
          agents: { 'test-agent-1': agent, 'test-agent-2': agent2 },
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const agentStep = createStep({
          id: 'agent-step',
          inputSchema: z.object({ agentName: z.string(), prompt: z.string() }),
          outputSchema: z.object({ text: z.string() }),
          execute: async ({ inputData, mastra }) => {
            const agent = mastra.getAgent(inputData.agentName);
            const result = await agent.generateLegacy([{ role: 'user', content: inputData.prompt }]);
            return { text: result.text };
          },
        });

        const agentStep2 = cloneStep(agentStep, { id: 'agent-step-2' });

        workflow
          .then(
            createWorkflow({
              id: 'nested-workflow',
              inputSchema: z.object({ prompt1: z.string(), prompt2: z.string() }),
              outputSchema: z.object({ text: z.string() }),
            })
              .map({
                agentName: {
                  value: 'test-agent-1',
                  schema: z.string(),
                },
                prompt: {
                  initData: workflow,
                  path: 'prompt1',
                },
              })
              .then(agentStep)
              .map({
                agentName: {
                  value: 'test-agent-2',
                  schema: z.string(),
                },
                prompt: {
                  initData: workflow,
                  path: 'prompt2',
                },
              })
              .then(agentStep2)
              .then(
                createStep({
                  id: 'final-step',
                  inputSchema: z.object({ text: z.string() }),
                  outputSchema: z.object({ text: z.string() }),
                  execute: async ({ getStepResult }) => {
                    return { text: `${getStepResult(agentStep)?.text} ${getStepResult(agentStep2)?.text}` };
                  },
                }),
              )
              .commit(),
          )
          .commit();

        const run = workflow.createRun();
        const result = await run.start({
          inputData: { prompt1: 'Capital of France, just the name', prompt2: 'Capital of UK, just the name' },
        });

        expect(result.steps['nested-workflow']).toEqual({
          status: 'success',
          output: { text: 'Paris London' },
          payload: {
            prompt1: 'Capital of France, just the name',
            prompt2: 'Capital of UK, just the name',
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('Nested workflows', () => {
      it('should be able to nest workflows', async () => {
        const start = vi.fn().mockImplementation(async ({ inputData }) => {
          // Get the current value (either from trigger or previous increment)
          const currentValue = inputData.startValue || 0;

          // Increment the value
          const newValue = currentValue + 1;

          return { newValue };
        });
        const startStep = createStep({
          id: 'start',
          inputSchema: z.object({ startValue: z.number() }),
          outputSchema: z.object({
            newValue: z.number(),
          }),
          execute: start,
        });

        const other = vi.fn().mockImplementation(async () => {
          return { other: 26 };
        });
        const otherStep = createStep({
          id: 'other',
          inputSchema: z.object({ newValue: z.number() }),
          outputSchema: z.object({ other: z.number() }),
          execute: other,
        });

        const final = vi.fn().mockImplementation(async ({ getStepResult }) => {
          const startVal = getStepResult(startStep)?.newValue ?? 0;
          const otherVal = getStepResult(otherStep)?.other ?? 0;
          return { finalValue: startVal + otherVal };
        });
        const last = vi.fn().mockImplementation(async () => {
          return { success: true };
        });
        const finalStep = createStep({
          id: 'final',
          inputSchema: z.object({ newValue: z.number(), other: z.number() }),
          outputSchema: z.object({ success: z.boolean() }),
          execute: final,
        });

        const counterWorkflow = createWorkflow({
          id: 'counter-workflow',
          inputSchema: z.object({
            startValue: z.number(),
          }),
          outputSchema: z.object({ success: z.boolean() }),
        });

        const wfA = createWorkflow({
          id: 'nested-workflow-a',
          inputSchema: counterWorkflow.inputSchema,
          outputSchema: z.object({ success: z.boolean() }),
        })
          .then(startStep)
          .then(otherStep)
          .then(finalStep)
          .commit();
        const wfB = createWorkflow({
          id: 'nested-workflow-b',
          inputSchema: counterWorkflow.inputSchema,
          outputSchema: z.object({ success: z.boolean() }),
        })
          .then(startStep)
          .then(finalStep)
          .commit();
        counterWorkflow
          .parallel([wfA, wfB])
          .then(
            createStep({
              id: 'last-step',
              inputSchema: z.object({
                'nested-workflow-a': z.object({ success: z.boolean() }),
                'nested-workflow-b': z.object({ success: z.boolean() }),
              }),
              outputSchema: z.object({ success: z.boolean() }),
              execute: last,
            }),
          )
          .commit();

        const mastra = new Mastra({
          workflows: {
            'counter-workflow': counterWorkflow,
          },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = counterWorkflow.createRun();
        const result = await run.start({ inputData: { startValue: 0 } });

        expect(start).toHaveBeenCalledTimes(2);
        expect(other).toHaveBeenCalledTimes(1);
        expect(final).toHaveBeenCalledTimes(2);
        expect(last).toHaveBeenCalledTimes(1);
        // @ts-ignore
        expect(result.steps['nested-workflow-a'].output).toEqual({
          finalValue: 26 + 1,
        });

        // @ts-ignore
        expect(result.steps['nested-workflow-b'].output).toEqual({
          finalValue: 1,
        });

        expect(result.steps['last-step']).toEqual({
          output: { success: true },
          status: 'success',
          payload: {
            'nested-workflow-a': {
              finalValue: 27,
            },
            'nested-workflow-b': {
              finalValue: 1,
            },
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });

      it('should be able to nest workflows sequentially', async () => {
        const start = vi.fn().mockImplementation(async ({ inputData }) => {
          // Get the current value (either from trigger or previous increment)
          const currentValue = inputData.startValue || 0;

          // Increment the value
          const newValue = currentValue + 1;

          return { newValue };
        });
        const startStep = createStep({
          id: 'start',
          inputSchema: z.object({ startValue: z.number() }),
          outputSchema: z.object({
            newValue: z.number(),
          }),
          execute: start,
        });

        const other = vi.fn().mockImplementation(async ({ inputData }) => {
          return { other: inputData.newValue + 26 };
        });
        const otherStep = createStep({
          id: 'other',
          inputSchema: z.object({ newValue: z.number() }),
          outputSchema: z.object({ other: z.number() }),
          execute: other,
        });

        const final = vi.fn().mockImplementation(async ({ getStepResult }) => {
          const otherVal = getStepResult(otherStep)?.other ?? 0;
          return { finalValue: 1 + otherVal };
        });
        const finalStep = createStep({
          id: 'final',
          inputSchema: z.object({ newValue: z.number(), other: z.number() }),
          outputSchema: z.object({ success: z.boolean() }),
          execute: final,
        });

        const counterWorkflow = createWorkflow({
          id: 'counter-workflow',
          inputSchema: z.object({
            startValue: z.number(),
          }),
          outputSchema: z.object({ success: z.boolean() }),
        });

        const wfA = createWorkflow({
          id: 'nested-workflow-a',
          inputSchema: counterWorkflow.inputSchema,
          outputSchema: startStep.outputSchema,
        })
          .then(startStep)
          .commit();
        const wfB = createWorkflow({
          id: 'nested-workflow-b',
          inputSchema: wfA.outputSchema,
          outputSchema: z.object({ success: z.boolean() }),
        })
          .then(otherStep)
          .then(finalStep)
          .commit();
        counterWorkflow.then(wfA).then(wfB).commit();

        const mastra = new Mastra({
          workflows: {
            'counter-workflow': counterWorkflow,
          },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = counterWorkflow.createRun();
        const result = await run.start({ inputData: { startValue: 0 } });

        expect(start).toHaveBeenCalledTimes(1);
        expect(other).toHaveBeenCalledTimes(1);
        expect(final).toHaveBeenCalledTimes(1);
        // @ts-ignore
        expect(result.steps['nested-workflow-a'].output).toEqual({
          newValue: 1,
        });

        // @ts-ignore
        expect(result.steps['nested-workflow-b'].output).toEqual({
          finalValue: 28,
        });
        await mastra.stopEventEngine();
      });

      it('should be able clone workflows as steps', async () => {
        const start = vi.fn().mockImplementation(async ({ inputData }) => {
          // Get the current value (either from trigger or previous increment)
          const currentValue = inputData.startValue || 0;

          // Increment the value
          const newValue = currentValue + 1;

          return { newValue };
        });
        const startStep = createStep({
          id: 'start',
          inputSchema: z.object({ startValue: z.number() }),
          outputSchema: z.object({
            newValue: z.number(),
          }),
          execute: start,
        });

        const other = vi.fn().mockImplementation(async () => {
          return { other: 26 };
        });
        const otherStep = createStep({
          id: 'other',
          inputSchema: z.object({ newValue: z.number() }),
          outputSchema: z.object({ other: z.number() }),
          execute: other,
        });

        const final = vi.fn().mockImplementation(async ({ getStepResult }) => {
          const startVal = getStepResult(startStep)?.newValue ?? 0;
          const otherVal = getStepResult(cloneStep(otherStep, { id: 'other-clone' }))?.other ?? 0;
          return { finalValue: startVal + otherVal };
        });
        const last = vi.fn().mockImplementation(async () => {
          return { success: true };
        });
        const finalStep = createStep({
          id: 'final',
          inputSchema: z.object({ newValue: z.number(), other: z.number() }),
          outputSchema: z.object({ success: z.boolean() }),
          execute: final,
        });

        const counterWorkflow = createWorkflow({
          id: 'counter-workflow',
          inputSchema: z.object({
            startValue: z.number(),
          }),
          outputSchema: z.object({ success: z.boolean() }),
        });

        const wfA = createWorkflow({
          id: 'nested-workflow-a',
          inputSchema: counterWorkflow.inputSchema,
          outputSchema: z.object({ success: z.boolean() }),
        })
          .then(startStep)
          .then(cloneStep(otherStep, { id: 'other-clone' }))
          .then(finalStep)
          .commit();
        const wfB = createWorkflow({
          id: 'nested-workflow-b',
          inputSchema: counterWorkflow.inputSchema,
          outputSchema: z.object({ success: z.boolean() }),
        })
          .then(startStep)
          .then(cloneStep(finalStep, { id: 'final-clone' }))
          .commit();

        const wfAClone = cloneWorkflow(wfA, { id: 'nested-workflow-a-clone' });

        counterWorkflow
          .parallel([wfAClone, wfB])
          .then(
            createStep({
              id: 'last-step',
              inputSchema: z.object({
                'nested-workflow-b': z.object({ success: z.boolean() }),
                'nested-workflow-a-clone': z.object({ success: z.boolean() }),
              }),
              outputSchema: z.object({ success: z.boolean() }),
              execute: last,
            }),
          )
          .commit();

        const mastra = new Mastra({
          workflows: {
            'counter-workflow': counterWorkflow,
          },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = counterWorkflow.createRun();
        const result = await run.start({ inputData: { startValue: 0 } });

        expect(start).toHaveBeenCalledTimes(2);
        expect(other).toHaveBeenCalledTimes(1);
        expect(final).toHaveBeenCalledTimes(2);
        expect(last).toHaveBeenCalledTimes(1);
        // @ts-ignore
        expect(result.steps['nested-workflow-a-clone'].output).toEqual({
          finalValue: 26 + 1,
        });

        // @ts-ignore
        expect(result.steps['nested-workflow-b'].output).toEqual({
          finalValue: 1,
        });

        expect(result.steps['last-step']).toEqual({
          output: { success: true },
          status: 'success',
          payload: {
            'nested-workflow-a-clone': {
              finalValue: 27,
            },
            'nested-workflow-b': {
              finalValue: 1,
            },
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });

      it('should be able to nest workflows with conditions', async () => {
        const start = vi.fn().mockImplementation(async ({ inputData }) => {
          // Get the current value (either from trigger or previous increment)
          const currentValue = inputData.startValue || 0;

          // Increment the value
          const newValue = currentValue + 1;

          return { newValue };
        });
        const startStep = createStep({
          id: 'start',
          inputSchema: z.object({ startValue: z.number() }),
          outputSchema: z.object({
            newValue: z.number(),
          }),
          execute: start,
        });

        const other = vi.fn().mockImplementation(async () => {
          return { other: 26 };
        });
        const otherStep = createStep({
          id: 'other',
          inputSchema: z.object({ newValue: z.number() }),
          outputSchema: z.object({ other: z.number() }),
          execute: other,
        });

        const final = vi.fn().mockImplementation(async ({ getStepResult }) => {
          const startVal = getStepResult(startStep)?.newValue ?? 0;
          const otherVal = getStepResult(otherStep)?.other ?? 0;
          return { finalValue: startVal + otherVal };
        });
        const last = vi.fn().mockImplementation(async () => {
          return { success: true };
        });
        const finalStep = createStep({
          id: 'final',
          inputSchema: z.object({ newValue: z.number(), other: z.number() }),
          outputSchema: z.object({ finalValue: z.number() }),
          execute: final,
        });

        const counterWorkflow = createWorkflow({
          id: 'counter-workflow',
          inputSchema: z.object({
            startValue: z.number(),
          }),
          outputSchema: z.object({ success: z.boolean() }),
        });

        const wfA = createWorkflow({
          id: 'nested-workflow-a',
          inputSchema: counterWorkflow.inputSchema,
          outputSchema: finalStep.outputSchema,
        })
          .then(startStep)
          .then(otherStep)
          .then(finalStep)
          .commit();
        const wfB = createWorkflow({
          id: 'nested-workflow-b',
          inputSchema: counterWorkflow.inputSchema,
          outputSchema: z.object({ other: otherStep.outputSchema, final: finalStep.outputSchema }),
        })
          .then(startStep)
          .branch([
            [async () => false, otherStep],
            // @ts-ignore
            [async () => true, finalStep],
          ])
          .map({
            finalValue: mapVariable({
              step: finalStep,
              path: 'finalValue',
            }),
          })
          .commit();
        counterWorkflow
          .parallel([wfA, wfB])
          .then(
            createStep({
              id: 'last-step',
              inputSchema: z.object({
                'nested-workflow-a': wfA.outputSchema,
                'nested-workflow-b': wfB.outputSchema,
              }),
              outputSchema: z.object({ success: z.boolean() }),
              execute: last,
            }),
          )
          .commit();

        const mastra = new Mastra({
          workflows: { 'counter-workflow': counterWorkflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = counterWorkflow.createRun();
        const result = await run.start({ inputData: { startValue: 0 } });

        expect(start).toHaveBeenCalledTimes(2);
        expect(other).toHaveBeenCalledTimes(1);
        expect(final).toHaveBeenCalledTimes(2);
        expect(last).toHaveBeenCalledTimes(1);
        // @ts-ignore
        expect(result.steps['nested-workflow-a'].output).toEqual({
          finalValue: 26 + 1,
        });

        // @ts-ignore
        expect(result.steps['nested-workflow-b'].output).toEqual({
          finalValue: 1,
        });

        expect(result.steps['last-step']).toEqual({
          output: { success: true },
          status: 'success',
          payload: {
            'nested-workflow-a': {
              finalValue: 27,
            },
            'nested-workflow-b': {
              finalValue: 1,
            },
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });

      describe.sequential('new if else branching syntax with nested workflows', () => {
        it('should execute if-branch', async () => {
          const start = vi.fn().mockImplementation(async ({ inputData }) => {
            // Get the current value (either from trigger or previous increment)
            const currentValue = inputData.startValue || 0;

            // Increment the value
            const newValue = currentValue + 1;

            return { newValue };
          });
          const startStep = createStep({
            id: 'start',
            inputSchema: z.object({ startValue: z.number() }),
            outputSchema: z.object({
              newValue: z.number(),
            }),
            execute: start,
          });

          const other = vi.fn().mockImplementation(async () => {
            return { other: 26 };
          });
          const otherStep = createStep({
            id: 'other',
            inputSchema: z.object({ newValue: z.number() }),
            outputSchema: z.object({ other: z.number() }),
            execute: other,
          });

          const final = vi.fn().mockImplementation(async ({ getStepResult }) => {
            const startVal = getStepResult(startStep)?.newValue ?? 0;
            const otherVal = getStepResult(otherStep)?.other ?? 0;
            return { finalValue: startVal + otherVal };
          });
          const first = vi.fn().mockImplementation(async () => {
            return { success: true };
          });
          const last = vi.fn().mockImplementation(async () => {
            return { success: true };
          });
          const finalStep = createStep({
            id: 'final',
            inputSchema: z.object({ newValue: z.number(), other: z.number() }),
            outputSchema: z.object({ finalValue: z.number() }),
            execute: final,
          });

          const counterWorkflow = createWorkflow({
            id: 'counter-workflow',
            inputSchema: z.object({
              startValue: z.number(),
            }),
            outputSchema: z.object({ success: z.boolean() }),
          });

          const wfA = createWorkflow({
            id: 'nested-workflow-a',
            inputSchema: counterWorkflow.inputSchema,
            outputSchema: finalStep.outputSchema,
          })
            .then(startStep)
            .then(otherStep)
            .then(finalStep)
            .commit();
          const wfB = createWorkflow({
            id: 'nested-workflow-b',
            inputSchema: counterWorkflow.inputSchema,
            outputSchema: finalStep.outputSchema,
          })
            .then(startStep)
            .then(finalStep)
            .commit();
          counterWorkflow
            .then(
              createStep({
                id: 'first-step',
                inputSchema: z.object({ startValue: z.number() }),
                outputSchema: wfA.inputSchema,
                execute: first,
              }),
            )
            .branch([
              [async () => true, wfA],
              [async () => false, wfB],
            ])
            .then(
              createStep({
                id: 'last-step',
                inputSchema: z.object({
                  'nested-workflow-a': wfA.outputSchema,
                  'nested-workflow-b': wfB.outputSchema,
                }),
                outputSchema: z.object({ success: z.boolean() }),
                execute: last,
              }),
            )
            .commit();

          const mastra = new Mastra({
            workflows: { 'counter-workflow': counterWorkflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = counterWorkflow.createRun();
          const result = await run.start({ inputData: { startValue: 0 } });

          expect(start).toHaveBeenCalledTimes(1);
          expect(other).toHaveBeenCalledTimes(1);
          expect(final).toHaveBeenCalledTimes(1);
          expect(first).toHaveBeenCalledTimes(1);
          expect(last).toHaveBeenCalledTimes(1);
          // @ts-ignore
          expect(result.steps['nested-workflow-a'].output).toEqual({
            finalValue: 26 + 1,
          });

          expect(result.steps['first-step']).toEqual({
            output: { success: true },
            status: 'success',
            payload: {
              startValue: 0,
            },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });

          expect(result.steps['last-step']).toEqual({
            output: { success: true },
            status: 'success',
            payload: {
              'nested-workflow-a': {
                finalValue: 27,
              },
              'nested-workflow-b': undefined,
            },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });
          await mastra.stopEventEngine();
        });

        it('should execute else-branch', async () => {
          const start = vi.fn().mockImplementation(async ({ inputData }) => {
            // Get the current value (either from trigger or previous increment)
            const currentValue = inputData.startValue || 0;

            // Increment the value
            const newValue = currentValue + 1;

            return { newValue };
          });
          const startStep = createStep({
            id: 'start',
            inputSchema: z.object({ startValue: z.number() }),
            outputSchema: z.object({
              newValue: z.number(),
            }),
            execute: start,
          });

          const other = vi.fn().mockImplementation(async () => {
            return { other: 26 };
          });
          const otherStep = createStep({
            id: 'other',
            inputSchema: z.object({ newValue: z.number() }),
            outputSchema: z.object({ other: z.number() }),
            execute: other,
          });

          const final = vi.fn().mockImplementation(async ({ getStepResult }) => {
            const startVal = getStepResult(startStep)?.newValue ?? 0;
            const otherVal = getStepResult(otherStep)?.other ?? 0;
            return { finalValue: startVal + otherVal };
          });
          const first = vi.fn().mockImplementation(async () => {
            return { success: true };
          });
          const last = vi.fn().mockImplementation(async () => {
            return { success: true };
          });
          const finalStep = createStep({
            id: 'final',
            inputSchema: z.object({ newValue: z.number(), other: z.number() }),
            outputSchema: z.object({ finalValue: z.number() }),
            execute: final,
          });

          const counterWorkflow = createWorkflow({
            id: 'counter-workflow',
            inputSchema: z.object({
              startValue: z.number(),
            }),
            outputSchema: z.object({ success: z.boolean() }),
          });

          const wfA = createWorkflow({
            id: 'nested-workflow-a',
            inputSchema: counterWorkflow.inputSchema,
            outputSchema: finalStep.outputSchema,
          })
            .then(startStep)
            .then(otherStep)
            .then(finalStep)
            .commit();
          const wfB = createWorkflow({
            id: 'nested-workflow-b',
            inputSchema: counterWorkflow.inputSchema,
            outputSchema: finalStep.outputSchema,
          })
            .then(startStep)
            .then(finalStep)
            .commit();
          counterWorkflow
            .then(
              createStep({
                id: 'first-step',
                inputSchema: z.object({ startValue: z.number() }),
                outputSchema: wfA.inputSchema,
                execute: first,
              }),
            )
            .branch([
              [async () => false, wfA],
              [async () => true, wfB],
            ])
            .then(
              createStep({
                id: 'last-step',
                inputSchema: z.object({
                  'nested-workflow-a': wfA.outputSchema,
                  'nested-workflow-b': wfB.outputSchema,
                }),
                outputSchema: z.object({ success: z.boolean() }),
                execute: last,
              }),
            )
            .commit();

          const mastra = new Mastra({
            workflows: { 'counter-workflow': counterWorkflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = counterWorkflow.createRun();
          const result = await run.start({ inputData: { startValue: 0 } });

          expect(start).toHaveBeenCalledTimes(1);
          expect(other).toHaveBeenCalledTimes(0);
          expect(final).toHaveBeenCalledTimes(1);
          expect(first).toHaveBeenCalledTimes(1);
          expect(last).toHaveBeenCalledTimes(1);

          // @ts-ignore
          expect(result.steps['nested-workflow-b'].output).toEqual({
            finalValue: 1,
          });

          expect(result.steps['first-step']).toEqual({
            output: { success: true },
            status: 'success',
            payload: {
              startValue: 0,
            },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });

          expect(result.steps['last-step']).toEqual({
            output: { success: true },
            status: 'success',
            payload: {
              'nested-workflow-b': {
                finalValue: 1,
              },
              'nested-workflow-a': undefined,
            },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });
          await mastra.stopEventEngine();
        });

        it('should execute nested else and if-branch', async () => {
          const start = vi.fn().mockImplementation(async ({ inputData }) => {
            // Get the current value (either from trigger or previous increment)
            const currentValue = inputData.startValue || 0;

            // Increment the value
            const newValue = currentValue + 1;

            return { newValue };
          });
          const startStep = createStep({
            id: 'start',
            inputSchema: z.object({ startValue: z.number() }),
            outputSchema: z.object({
              newValue: z.number(),
            }),
            execute: start,
          });

          const other = vi.fn().mockImplementation(async () => {
            return { other: 26 };
          });
          const otherStep = createStep({
            id: 'other',
            inputSchema: z.object({ newValue: z.number() }),
            outputSchema: z.object({ other: z.number() }),
            execute: other,
          });

          const final = vi.fn().mockImplementation(async ({ getStepResult }) => {
            const startVal = getStepResult(startStep)?.newValue ?? 0;
            const otherVal = getStepResult(otherStep)?.other ?? 0;
            return { finalValue: startVal + otherVal };
          });
          const first = vi.fn().mockImplementation(async () => {
            return { success: true };
          });
          const last = vi.fn().mockImplementation(async () => {
            return { success: true };
          });
          const finalStep = createStep({
            id: 'final',
            inputSchema: z.object({ newValue: z.number(), other: z.number() }),
            outputSchema: z.object({ finalValue: z.number() }),
            execute: final,
          });

          const counterWorkflow = createWorkflow({
            id: 'counter-workflow',
            inputSchema: z.object({
              startValue: z.number(),
            }),
            outputSchema: z.object({ success: z.boolean() }),
          });

          const wfA = createWorkflow({
            id: 'nested-workflow-a',
            inputSchema: counterWorkflow.inputSchema,
            outputSchema: finalStep.outputSchema,
          })
            .then(startStep)
            .then(otherStep)
            .then(finalStep)
            .commit();
          const wfB = createWorkflow({
            id: 'nested-workflow-b',
            inputSchema: counterWorkflow.inputSchema,
            outputSchema: finalStep.outputSchema,
          })
            .then(startStep)
            .branch([
              [
                async () => true,
                createWorkflow({
                  id: 'nested-workflow-c',
                  inputSchema: startStep.outputSchema,
                  outputSchema: otherStep.outputSchema,
                })
                  .then(otherStep)
                  .commit(),
              ],
              [
                async () => false,
                createWorkflow({
                  id: 'nested-workflow-d',
                  inputSchema: startStep.outputSchema,
                  outputSchema: otherStep.outputSchema,
                })
                  .then(otherStep)
                  .commit(),
              ],
            ])
            // TODO: maybe make this a little nicer to do with .map()?
            .then(
              createStep({
                id: 'map-results',
                inputSchema: z.object({
                  'nested-workflow-c': otherStep.outputSchema,
                  'nested-workflow-d': otherStep.outputSchema,
                }),
                outputSchema: otherStep.outputSchema,
                execute: async ({ inputData }) => {
                  return { other: inputData['nested-workflow-c']?.other ?? inputData['nested-workflow-d']?.other };
                },
              }),
            )
            .then(finalStep)
            .commit();

          counterWorkflow
            .then(
              createStep({
                id: 'first-step',
                inputSchema: z.object({ startValue: z.number() }),
                outputSchema: wfA.inputSchema,
                execute: first,
              }),
            )
            .branch([
              [async () => false, wfA],
              [async () => true, wfB],
            ])
            .then(
              createStep({
                id: 'last-step',
                inputSchema: z.object({
                  'nested-workflow-a': wfA.outputSchema,
                  'nested-workflow-b': wfB.outputSchema,
                }),
                outputSchema: z.object({ success: z.boolean() }),
                execute: last,
              }),
            )
            .commit();

          const mastra = new Mastra({
            workflows: { 'counter-workflow': counterWorkflow },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = counterWorkflow.createRun();
          const result = await run.start({ inputData: { startValue: 1 } });

          expect(start).toHaveBeenCalledTimes(1);
          expect(other).toHaveBeenCalledTimes(1);
          expect(final).toHaveBeenCalledTimes(1);
          expect(first).toHaveBeenCalledTimes(1);
          expect(last).toHaveBeenCalledTimes(1);

          // @ts-ignore
          expect(result.steps['nested-workflow-b'].output).toEqual({
            finalValue: 1,
          });

          expect(result.steps['first-step']).toEqual({
            output: { success: true },
            status: 'success',
            payload: {
              startValue: 1,
            },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });

          expect(result.steps['last-step']).toEqual({
            output: { success: true },
            status: 'success',
            payload: {
              'nested-workflow-a': undefined,
              'nested-workflow-b': {
                finalValue: 1,
              },
            },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });
          await mastra.stopEventEngine();
        });
      });

      describe.sequential('suspending and resuming nested workflows', () => {
        it('should be able to suspend nested workflow step', async () => {
          const start = vi.fn().mockImplementation(async ({ inputData }) => {
            // Get the current value (either from trigger or previous increment)
            const currentValue = inputData.startValue || 0;

            // Increment the value
            const newValue = currentValue + 1;

            return { newValue };
          });
          const startStep = createStep({
            id: 'start',
            inputSchema: z.object({ startValue: z.number() }),
            outputSchema: z.object({
              newValue: z.number(),
            }),
            execute: start,
          });

          const other = vi.fn().mockImplementation(async ({ suspend, resumeData }) => {
            if (!resumeData) {
              return await suspend();
            }
            return { other: 26 };
          });
          const otherStep = createStep({
            id: 'other',
            inputSchema: z.object({ newValue: z.number() }),
            outputSchema: z.object({ other: z.number() }),
            execute: other,
          });

          const final = vi.fn().mockImplementation(async ({ getStepResult }) => {
            const startVal = getStepResult(startStep)?.newValue ?? 0;
            const otherVal = getStepResult(otherStep)?.other ?? 0;
            return { finalValue: startVal + otherVal };
          });
          const last = vi.fn().mockImplementation(async ({}) => {
            return { success: true };
          });
          const begin = vi.fn().mockImplementation(async ({ inputData }) => {
            return inputData;
          });
          const finalStep = createStep({
            id: 'final',
            inputSchema: z.object({ newValue: z.number(), other: z.number() }),
            outputSchema: z.object({
              finalValue: z.number(),
            }),
            execute: final,
          });

          const counterWorkflow = createWorkflow({
            id: 'counter-workflow',
            inputSchema: z.object({
              startValue: z.number(),
            }),
            outputSchema: z.object({
              finalValue: z.number(),
            }),
          });

          const wfA = createWorkflow({
            id: 'nested-workflow-a',
            inputSchema: counterWorkflow.inputSchema,
            outputSchema: finalStep.outputSchema,
          })
            .then(startStep)
            .then(otherStep)
            .then(finalStep)
            .commit();

          counterWorkflow
            .then(
              createStep({
                id: 'begin-step',
                inputSchema: counterWorkflow.inputSchema,
                outputSchema: counterWorkflow.inputSchema,
                execute: begin,
              }),
            )
            .then(wfA)
            .then(
              createStep({
                id: 'last-step',
                inputSchema: wfA.outputSchema,
                outputSchema: z.object({ success: z.boolean() }),
                execute: last,
              }),
            )
            .commit();

          const mastra = new Mastra({
            logger: false,
            storage: testStorage,
            workflows: { 'counter-workflow': counterWorkflow },
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = counterWorkflow.createRun();
          const result = await run.start({ inputData: { startValue: 0 } });
          expect(begin).toHaveBeenCalledTimes(1);
          expect(start).toHaveBeenCalledTimes(1);
          expect(other).toHaveBeenCalledTimes(1);
          expect(final).toHaveBeenCalledTimes(0);
          expect(last).toHaveBeenCalledTimes(0);
          expect(result.steps['nested-workflow-a']).toMatchObject({
            status: 'suspended',
          });

          // @ts-ignore
          expect(result.steps['last-step']).toEqual(undefined);

          const resumedResults = await run.resume({ step: [wfA, otherStep], resumeData: { newValue: 0 } });

          // @ts-ignore
          expect(resumedResults.steps['nested-workflow-a'].output).toEqual({
            finalValue: 26 + 1,
          });

          expect(start).toHaveBeenCalledTimes(1);
          expect(other).toHaveBeenCalledTimes(2);
          expect(final).toHaveBeenCalledTimes(1);
          expect(last).toHaveBeenCalledTimes(1);
          await mastra.stopEventEngine();
        });
      });

      describe.sequential('Workflow results', () => {
        it('should be able to spec out workflow result via variables', async () => {
          const start = vi.fn().mockImplementation(async ({ inputData }) => {
            // Get the current value (either from trigger or previous increment)
            const currentValue = inputData.startValue || 0;

            // Increment the value
            const newValue = currentValue + 1;

            return { newValue };
          });
          const startStep = createStep({
            id: 'start',
            inputSchema: z.object({ startValue: z.number() }),
            outputSchema: z.object({
              newValue: z.number(),
            }),
            execute: start,
          });

          const other = vi.fn().mockImplementation(async () => {
            return { other: 26 };
          });
          const otherStep = createStep({
            id: 'other',
            inputSchema: z.object({ newValue: z.number() }),
            outputSchema: z.object({ other: z.number() }),
            execute: other,
          });

          const final = vi.fn().mockImplementation(async ({ getStepResult }) => {
            const startVal = getStepResult(startStep)?.newValue ?? 0;
            const otherVal = getStepResult(otherStep)?.other ?? 0;
            return { finalValue: startVal + otherVal };
          });
          const last = vi.fn().mockImplementation(async () => {
            return { success: true };
          });
          const finalStep = createStep({
            id: 'final',
            inputSchema: z.object({ newValue: z.number(), other: z.number() }),
            outputSchema: z.object({
              finalValue: z.number(),
            }),
            execute: final,
          });

          const wfA = createWorkflow({
            steps: [startStep, otherStep, finalStep],
            id: 'nested-workflow-a',
            inputSchema: z.object({
              startValue: z.number(),
            }),
            outputSchema: z.object({
              finalValue: z.number(),
            }),
          })
            .then(startStep)
            .then(otherStep)
            .then(finalStep)
            .commit();

          const counterWorkflow = createWorkflow({
            id: 'counter-workflow',
            inputSchema: z.object({
              startValue: z.number(),
            }),
            outputSchema: z.object({
              finalValue: z.number(),
            }),
          });

          counterWorkflow
            .then(wfA)
            .then(
              createStep({
                id: 'last-step',
                inputSchema: wfA.outputSchema,
                outputSchema: z.object({ success: z.boolean() }),
                execute: last,
              }),
            )
            .commit();

          const mastra = new Mastra({
            workflows: {
              'counter-workflow': counterWorkflow,
            },
            storage: testStorage,
            pubsub: new GoogleCloudPubSub({
              projectId: 'pubsub-test',
            }),
          });
          await mastra.startEventEngine();

          const run = counterWorkflow.createRun();
          const result = await run.start({ inputData: { startValue: 0 } });
          const results = result.steps;

          expect(start).toHaveBeenCalledTimes(1);
          expect(other).toHaveBeenCalledTimes(1);
          expect(final).toHaveBeenCalledTimes(1);
          expect(last).toHaveBeenCalledTimes(1);

          // @ts-ignore
          expect(results['nested-workflow-a']).toMatchObject({
            status: 'success',
            output: {
              finalValue: 26 + 1,
            },
          });

          expect(result.steps['last-step']).toEqual({
            status: 'success',
            output: { success: true },
            payload: {
              finalValue: 26 + 1,
            },
            startedAt: expect.any(Number),
            endedAt: expect.any(Number),
          });
          await mastra.stopEventEngine();
        });
      });

      it('should be able to suspend nested workflow step in a nested workflow step', async () => {
        const start = vi.fn().mockImplementation(async ({ inputData }) => {
          // Get the current value (either from trigger or previous increment)
          const currentValue = inputData.startValue || 0;

          // Increment the value
          const newValue = currentValue + 1;

          return { newValue };
        });
        const startStep = createStep({
          id: 'start',
          inputSchema: z.object({ startValue: z.number() }),
          outputSchema: z.object({
            newValue: z.number(),
          }),
          execute: start,
        });

        const other = vi.fn().mockImplementation(async ({ suspend, resumeData }) => {
          if (!resumeData) {
            return await suspend();
          }
          return { other: 26 };
        });
        const otherStep = createStep({
          id: 'other',
          inputSchema: z.object({ newValue: z.number() }),
          outputSchema: z.object({ other: z.number() }),
          execute: other,
        });

        const final = vi.fn().mockImplementation(async ({ getStepResult }) => {
          const startVal = getStepResult(startStep)?.newValue ?? 0;
          const otherVal = getStepResult(otherStep)?.other ?? 0;
          return { finalValue: startVal + otherVal };
        });
        const last = vi.fn().mockImplementation(async ({}) => {
          return { success: true };
        });
        const begin = vi.fn().mockImplementation(async ({ inputData }) => {
          return inputData;
        });
        const finalStep = createStep({
          id: 'final',
          inputSchema: z.object({ newValue: z.number(), other: z.number() }),
          outputSchema: z.object({
            finalValue: z.number(),
          }),
          execute: final,
        });

        const counterInputSchema = z.object({
          startValue: z.number(),
        });
        const counterOutputSchema = z.object({
          finalValue: z.number(),
        });

        const passthroughStep = createStep({
          id: 'passthrough',
          inputSchema: counterInputSchema,
          outputSchema: counterInputSchema,
          execute: vi.fn().mockImplementation(async ({ inputData }) => {
            return inputData;
          }),
        });

        const wfA = createWorkflow({
          id: 'nested-workflow-a',
          inputSchema: counterInputSchema,
          outputSchema: finalStep.outputSchema,
        })
          .then(startStep)
          .then(otherStep)
          .then(finalStep)
          .commit();

        const wfB = createWorkflow({
          id: 'nested-workflow-b',
          inputSchema: counterInputSchema,
          outputSchema: finalStep.outputSchema,
        })
          .then(passthroughStep)
          .then(wfA)
          .commit();

        const wfC = createWorkflow({
          id: 'nested-workflow-c',
          inputSchema: counterInputSchema,
          outputSchema: finalStep.outputSchema,
        })
          .then(passthroughStep)
          .then(wfB)
          .commit();

        const counterWorkflow = createWorkflow({
          id: 'counter-workflow',
          inputSchema: counterInputSchema,
          outputSchema: counterOutputSchema,
          steps: [wfC, passthroughStep],
        });

        counterWorkflow
          .then(
            createStep({
              id: 'begin-step',
              inputSchema: counterWorkflow.inputSchema,
              outputSchema: counterWorkflow.inputSchema,
              execute: begin,
            }),
          )
          .then(wfC)
          .then(
            createStep({
              id: 'last-step',
              inputSchema: wfA.outputSchema,
              outputSchema: z.object({ success: z.boolean() }),
              execute: last,
            }),
          )
          .commit();

        const mastra = new Mastra({
          logger: false,
          storage: testStorage,
          workflows: { 'counter-workflow': counterWorkflow },
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = counterWorkflow.createRun();
        const result = await run.start({ inputData: { startValue: 0 } });

        expect(passthroughStep.execute).toHaveBeenCalledTimes(2);
        expect(result.steps['nested-workflow-c']).toMatchObject({
          status: 'suspended',
          suspendPayload: {
            __workflow_meta: {
              path: ['nested-workflow-c', 'nested-workflow-b', 'nested-workflow-a', 'other'],
            },
          },
        });

        // @ts-ignore
        expect(result.steps['last-step']).toEqual(undefined);

        if (result.status !== 'suspended') {
          expect.fail('Workflow should be suspended');
        }
        expect(result.suspended[0]).toEqual(['nested-workflow-c', 'nested-workflow-b', 'nested-workflow-a', 'other']);
        const resumedResults = await run.resume({ step: result.suspended[0], resumeData: { newValue: 0 } });

        // @ts-ignore
        expect(resumedResults.steps['nested-workflow-c'].output).toEqual({
          finalValue: 26 + 1,
        });

        expect(start).toHaveBeenCalledTimes(1);
        expect(other).toHaveBeenCalledTimes(2);
        expect(final).toHaveBeenCalledTimes(1);
        expect(last).toHaveBeenCalledTimes(1);
        expect(passthroughStep.execute).toHaveBeenCalledTimes(2);
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('Dependency Injection', () => {
      it('should inject requestContext dependencies into steps during run', async () => {
        const requestContext = new RequestContext();
        const testValue = 'test-dependency';
        requestContext.set('testKey', testValue);

        const step = createStep({
          id: 'step1',
          execute: async ({ requestContext }) => {
            const value = requestContext.get('testKey');
            return { injectedValue: value };
          },
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });
        const workflow = createWorkflow({ id: 'test-workflow', inputSchema: z.object({}), outputSchema: z.object({}) });
        workflow.then(step).commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = workflow.createRun();
        const result = await run.start({ requestContext });

        // @ts-ignore
        expect(result.steps.step1.output.injectedValue).toBe(testValue);
        await mastra.stopEventEngine();
      });

      it('should inject requestContext dependencies into steps during resume', async () => {
        const initialStorage = new MockStore();

        const requestContext = new RequestContext();
        const testValue = 'test-dependency';
        requestContext.set('testKey', testValue);

        const execute = vi.fn(async ({ requestContext, suspend, resumeData }) => {
          if (!resumeData?.human) {
            return await suspend();
          }

          const value = requestContext.get('testKey');
          return { injectedValue: value };
        });

        const step = createStep({
          id: 'step1',
          execute,
          inputSchema: z.object({ human: z.boolean() }),
          outputSchema: z.object({}),
        });
        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        });
        workflow.then(step).commit();

        const mastra = new Mastra({
          logger: false,
          storage: initialStorage,
          workflows: { 'test-workflow': workflow },
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = workflow.createRun();
        await run.start({ requestContext });

        const resumerequestContext = new RequestContext();
        resumerequestContext.set('testKey', testValue + '2');

        const result = await run.resume({
          step: step,
          resumeData: {
            human: true,
          },
          requestContext: resumerequestContext,
        });

        // @ts-ignore
        expect(result?.steps.step1.output.injectedValue).toBe(testValue + '2');
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('consecutive parallel executions', () => {
      it('should support consecutive parallel calls with proper type inference', async () => {
        // First parallel stage steps
        const step1 = createStep({
          id: 'step1',
          inputSchema: z.object({
            input: z.string(),
          }),
          outputSchema: z.object({
            result1: z.string(),
          }),
          execute: vi.fn<any>().mockImplementation(async ({ inputData }) => ({
            result1: `processed-${inputData.input}`,
          })),
        });

        const step2 = createStep({
          id: 'step2',
          inputSchema: z.object({
            input: z.string(),
          }),
          outputSchema: z.object({
            result2: z.string(),
          }),
          execute: vi.fn<any>().mockImplementation(async ({ inputData }) => ({
            result2: `transformed-${inputData.input}`,
          })),
        });

        // Second parallel stage steps
        const step3 = createStep({
          id: 'step3',
          inputSchema: z.object({
            step1: z.object({
              result1: z.string(),
            }),
            step2: z.object({
              result2: z.string(),
            }),
          }),
          outputSchema: z.object({
            result3: z.string(),
          }),
          execute: vi.fn<any>().mockImplementation(async ({ inputData }) => {
            return { result3: `combined-${inputData.step1.result1}-${inputData.step2.result2}` };
          }),
        });

        const step4 = createStep({
          id: 'step4',
          inputSchema: z.object({
            step1: z.object({
              result1: z.string(),
            }),
            step2: z.object({
              result2: z.string(),
            }),
          }),
          outputSchema: z.object({
            result4: z.string(),
          }),
          execute: vi.fn<any>().mockImplementation(async ({ inputData }) => ({
            result4: `final-${inputData.step1.result1}-${inputData.step2.result2}`,
          })),
        });

        const workflow = createWorkflow({
          id: 'consecutive-parallel-workflow',
          inputSchema: z.object({
            input: z.string(),
          }),
          outputSchema: z.object({
            result3: z.string(),
            result4: z.string(),
          }),
          steps: [step1, step2, step3, step4],
        });

        // This tests the fix: consecutive parallel calls should work with proper type inference
        workflow.parallel([step1, step2]).parallel([step3, step4]).commit();

        const mastra = new Mastra({
          workflows: { 'consecutive-parallel-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const result = await run.start({ inputData: { input: 'test-data' } });

        // Verify the final results
        expect(result.status).toBe('success');
        expect(result.steps.step1).toEqual({
          status: 'success',
          output: { result1: 'processed-test-data' },
          payload: { input: 'test-data' },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        expect(result.steps.step2).toEqual({
          status: 'success',
          output: { result2: 'transformed-test-data' },
          payload: { input: 'test-data' },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        expect(result.steps.step3).toEqual({
          status: 'success',
          output: { result3: 'combined-processed-test-data-transformed-test-data' },
          payload: {
            step1: { result1: 'processed-test-data' },
            step2: { result2: 'transformed-test-data' },
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        expect(result.steps.step4).toEqual({
          status: 'success',
          output: { result4: 'final-processed-test-data-transformed-test-data' },
          payload: {
            step1: { result1: 'processed-test-data' },
            step2: { result2: 'transformed-test-data' },
          },
          startedAt: expect.any(Number),
          endedAt: expect.any(Number),
        });
        await mastra.stopEventEngine();
      });
    });

    describe.sequential('Run count', () => {
      it('retryCount property should increment the run count when a step is executed multiple times', async () => {
        const repeatingStep = createStep({
          id: 'repeatingStep',
          inputSchema: z.object({}),
          outputSchema: z.object({
            count: z.number(),
          }),
          execute: async ({ retryCount }) => {
            return { count: retryCount };
          },
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: repeatingStep.outputSchema,
        })
          .dountil(repeatingStep, async ({ inputData }) => inputData.count === 3)
          .commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const result = await workflow.createRun().start({ inputData: {} });

        expect(result.status).toBe('success');
        expect(result.steps.repeatingStep).toHaveProperty('output', { count: 3 });
        await mastra.stopEventEngine();
      });

      it('multiple steps should have different run counts', async () => {
        const step1 = createStep({
          id: 'step1',
          inputSchema: z.object({}),
          outputSchema: z.object({
            count: z.number(),
          }),
          execute: async ({ retryCount }) => {
            return { count: retryCount };
          },
        });

        const step2 = createStep({
          id: 'step2',
          inputSchema: step1.outputSchema,
          outputSchema: z.object({
            count: z.number(),
          }),
          execute: async ({ retryCount }) => {
            return { count: retryCount };
          },
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        })
          .dowhile(step1, async ({ inputData }) => {
            return inputData.count < 3;
          })
          .dountil(step2, async ({ inputData }) => inputData.count === 10)
          .commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        const result = await run.start({ inputData: {} });

        expect(result.status).toBe('success');
        expect(result.steps.step1).toHaveProperty('output', { count: 3 });
        expect(result.steps.step2).toHaveProperty('output', { count: 10 });
        await mastra.stopEventEngine();
      });

      it('runCount should exist and equal zero for the first run', async () => {
        const mockExec = vi.fn().mockImplementation(async ({ retryCount }) => {
          return { count: retryCount };
        });
        const step = createStep({
          id: 'step',
          inputSchema: z.object({}),
          outputSchema: z.object({
            count: z.number(),
          }),
          execute: mockExec,
        });

        const workflow = createWorkflow({
          id: 'test-workflow',
          inputSchema: z.object({}),
          outputSchema: z.object({}),
        })
          .then(step)
          .commit();

        const mastra = new Mastra({
          workflows: { 'test-workflow': workflow },
          storage: testStorage,
          pubsub: new GoogleCloudPubSub({
            projectId: 'pubsub-test',
          }),
        });
        await mastra.startEventEngine();

        const run = await workflow.createRun();
        await run.start({ inputData: {} });

        expect(mockExec).toHaveBeenCalledTimes(1);
        expect(mockExec).toHaveBeenCalledWith(expect.objectContaining({ retryCount: 0 }));
        await mastra.stopEventEngine();
      });
    });
  },
  200e3,
);
